diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index bb428fa6..d8775c8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -282,7 +282,7 @@
 
             JobSpecification jobSpec = compiler.createJob(appCtx, new JobEventListenerFactory(newTxnId, false));
 
-            JobId jobId = JobUtils.runJob(appCtx.getHcc(), jobSpec, true);
+            JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec, true);
 
             IResultSetReader resultSetReader = appCtx.getResultSet().createReader(jobId, resultSetId);
             FrameManager frameManager = new FrameManager(queryOptCtx.getPhysicalOptimizationConfig().getFrameSize());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 3a81c09..4674f7e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -107,7 +107,7 @@
             // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
             // We will need to design general exception handling mechanism for feeds.
             setLocations(jobInfo.getRight());
-            return JobUtils.runJob(hcc, feedJob, false);
+            return JobUtils.runJobIfActive(hcc, feedJob, false);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ad74d6c..0c736a2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -302,7 +302,7 @@
     protected final APIFramework apiFramework;
     protected final IRewriterFactory rewriterFactory;
     protected final ExecutorService executorService;
-    protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
+    protected final EnumSet<JobFlag> jobFlags = EnumSet.of(JobFlag.ENSURE_RUNNABLE);
     protected final IMetadataLockManager lockManager;
     protected final IMetadataLockUtil lockUtil;
     protected final IResponsePrinter responsePrinter;
@@ -2435,7 +2435,7 @@
                                 requestParameters.isForceDropDataset());
                     }
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        JobUtils.runJobIfActive(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
@@ -4298,7 +4298,7 @@
     private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
             String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception {
         jobSpec.setRequestId(reqId);
-        JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+        JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
         LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId);
         clientRequest.setJobId(jobId);
         return jobId;
@@ -5407,12 +5407,12 @@
 
     private static void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
             throws Exception {
-        JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+        JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true);
     }
 
     private static List<IOperatorStats> runJob(IHyracksClientConnection hcc, JobSpecification jobSpec,
             EnumSet<JobFlag> jobFlags, List<String> statOperatorNames) throws Exception {
-        Pair<JobId, List<IOperatorStats>> p = JobUtils.runJob(hcc, jobSpec, jobFlags, true, statOperatorNames);
+        Pair<JobId, List<IOperatorStats>> p = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true, statOperatorNames);
         return p.second;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 9724174..0b7e7d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -22,6 +22,8 @@
 import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
 import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
 import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
 import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
 
@@ -35,6 +37,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.http.IQueryWebServerRegistrant;
@@ -69,6 +72,7 @@
 import org.apache.asterix.common.api.INamespaceResolver;
 import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.AsterixExtension;
@@ -111,6 +115,7 @@
 import org.apache.hyracks.api.control.IGatekeeper;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.result.IJobResultCallback;
@@ -212,7 +217,7 @@
         ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
         ccServiceCtx.addJobLifecycleListener(globalTxManager);
 
-        jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
+        jobCapacityController = new JobCapacityController(controllerService.getResourceManager(), this);
     }
 
     protected INamespaceResolver createNamespaceResolver(boolean useDatabaseResolution) {
@@ -441,4 +446,14 @@
     public IJobResultCallback getJobResultCallback() {
         return new JobResultCallback(appCtx);
     }
+
+    @Override
+    public boolean acceptingJobs(Set<JobFlag> flags) {
+        // flags == null should not be needed since currently it's not null (but not enforced)
+        if (flags == null || !flags.contains(JobFlag.ENSURE_RUNNABLE)) {
+            return true;
+        }
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        return csm.getState() == ACTIVE || csm.getState() == REBALANCE_REQUIRED;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ed9d0c6..08972d4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -40,8 +40,6 @@
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -75,12 +73,6 @@
         return Collections.emptySet();
     }
 
-    private void executeHyracksJob(JobSpecification spec) throws Exception {
-        spec.setMaxReattempts(0);
-        JobId jobId = hcc.startJob(spec);
-        hcc.waitForCompletion(jobId);
-    }
-
     @Override
     public void startGlobalRecovery(ICcApplicationContext appCtx) {
         if (!recoveryCompleted && !recovering) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index f38c18b..1c17704 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -75,7 +75,7 @@
 
         JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        JobUtils.runJob(hcc, spec, true);
+        JobUtils.runJobIfActive(hcc, spec, true);
     }
 
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 180b6cc..cc9f1ad 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -305,7 +305,7 @@
     private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         JobSpecification spec = DatasetUtil.createDatasetJobSpec(target, metadataProvider);
-        JobUtils.runJob(hcc, spec, true);
+        JobUtils.forceRunJob(hcc, spec, true);
     }
 
     // Populates the data from the source dataset to the rebalance target dataset.
@@ -348,7 +348,7 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, commitOp, 0);
 
         // Executes the job.
-        JobUtils.runJob(hcc, spec, true);
+        JobUtils.forceRunJob(hcc, spec, true);
     }
 
     private static ITupleProjectorFactory createTupleProjectorFactory(Dataset source, MetadataProvider metadataProvider)
@@ -403,7 +403,7 @@
                     EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), null));
         }
         for (JobSpecification jobSpec : jobs) {
-            JobUtils.runJob(hcc, jobSpec, true);
+            JobUtils.forceRunJob(hcc, jobSpec, true);
         }
     }
 
@@ -427,12 +427,12 @@
             // Creates the secondary index.
             JobSpecification indexCreationJobSpec =
                     IndexUtil.buildSecondaryIndexCreationJobSpec(target, index, metadataProvider, null);
-            JobUtils.runJob(hcc, indexCreationJobSpec, true);
+            JobUtils.forceRunJob(hcc, indexCreationJobSpec, true);
 
             // Loads the secondary index.
             JobSpecification indexLoadingJobSpec =
                     IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, metadataProvider, null);
-            JobUtils.runJob(hcc, indexLoadingJobSpec, true);
+            JobUtils.forceRunJob(hcc, indexLoadingJobSpec, true);
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index c1c6f18..e3f57f3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -36,12 +36,28 @@
         ADDED_PENDINGOP_RECORD_TO_METADATA
     }
 
-    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
+    public static JobId forceRunJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
             throws Exception {
         return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion);
     }
 
-    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+    public static JobId runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
+            throws Exception {
+        return runJob(hcc, spec, EnumSet.of(JobFlag.ENSURE_RUNNABLE), waitForCompletion);
+    }
+
+    public static JobId runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+            boolean waitForCompletion) throws Exception {
+        if (jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+            return runJob(hcc, spec, jobFlags, waitForCompletion);
+        } else {
+            EnumSet<JobFlag> flags = EnumSet.copyOf(jobFlags);
+            flags.add(JobFlag.ENSURE_RUNNABLE);
+            return runJob(hcc, spec, flags, waitForCompletion);
+        }
+    }
+
+    private static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
             boolean waitForCompletion) throws Exception {
         spec.setMaxReattempts(0);
         final JobId jobId = hcc.startJob(spec, jobFlags);
@@ -57,8 +73,12 @@
         return jobId;
     }
 
-    public static Pair<JobId, List<IOperatorStats>> runJob(IHyracksClientConnection hcc, JobSpecification spec,
+    public static Pair<JobId, List<IOperatorStats>> runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec,
             EnumSet<JobFlag> jobFlags, boolean waitForCompletion, List<String> statOperatorNames) throws Exception {
+        if (!jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+            jobFlags = EnumSet.copyOf(jobFlags);
+            jobFlags.add(JobFlag.ENSURE_RUNNABLE);
+        }
         spec.setMaxReattempts(0);
         final JobId jobId = hcc.startJob(spec, jobFlags);
         List<IOperatorStats> opStats = null;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9aa5d84..132efbc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -403,7 +403,7 @@
 
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                JobUtils.runJobIfActive(hcc, jobSpec, true);
             }
 
             mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index ae903d1..236056c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -19,8 +19,14 @@
 
 package org.apache.asterix.runtime.job.resource;
 
+import java.util.Set;
+
+import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
@@ -36,13 +42,19 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final IResourceManager resourceManager;
+    private final ICCApplication ccApp;
 
-    public JobCapacityController(IResourceManager resourceManager) {
+    public JobCapacityController(IResourceManager resourceManager, ICCApplication ccApp) {
         this.resourceManager = resourceManager;
+        this.ccApp = ccApp;
     }
 
     @Override
-    public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+    public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags)
+            throws HyracksException {
+        if (!ccApp.acceptingJobs(jobFlags)) {
+            throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job);
+        }
         IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
         long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize();
         int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
index 48c61b4..e70b306 100644
--- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
@@ -22,8 +22,13 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -36,31 +41,34 @@
 
 public class JobCapacityControllerTest {
 
+    private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
     @Test
     public void test() throws HyracksException {
+        JobId jobId = new JobId(0);
         IResourceManager resourceManager = makeResourceManagerWithCapacity(4294967296L, 33);
-        JobCapacityController capacityController = new JobCapacityController(resourceManager);
+        JobCapacityController capacityController = new JobCapacityController(resourceManager, makeCCApp());
 
         // Verifies the correctness of the allocate method.
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(4294967296L, 16)) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(2147483648L, 16)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(2147483648L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+        Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L, 16), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 16), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+        Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 32), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
 
         boolean exceedCapacity = false;
         try {
-            capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64));
+            capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64), jobId, none);
         } catch (HyracksException e) {
             exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
         }
         Assert.assertTrue(exceedCapacity);
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(4294967296L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+        Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L, 32), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
         exceedCapacity = false;
         try {
-            capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33));
+            capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33), jobId, none);
         } catch (HyracksException e) {
             exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
         }
@@ -95,4 +103,9 @@
         return clusterCapacity;
     }
 
+    private ICCApplication makeCCApp() {
+        ICCApplication ccApp = mock(ICCApplication.class);
+        when(ccApp.acceptingJobs(none)).thenReturn(true);
+        return ccApp;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index 5cc8f69..6c22386 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -18,8 +18,11 @@
  */
 package org.apache.hyracks.api.application;
 
+import java.util.Set;
+
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
 
@@ -37,4 +40,9 @@
      * @return the job result callback
      */
     IJobResultCallback getJobResultCallback();
+
+    /**
+     * @return true if the application is accepting jobs. False, otherwise.
+     */
+    boolean acceptingJobs(Set<JobFlag> jobFlags);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index be965eb..66f4fab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -89,6 +89,66 @@
     JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
+     * Used to run a deployed Job Spec by id
+     *
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
+     * @param jobParameters
+     *            The serialized job parameters
+     * @throws Exception
+     */
+    JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
+
+    /**
+     * Start the specified Job.
+     *
+     * @param acggf
+     *            Activity Cluster Graph Generator Factory
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    /**
+     * Start the specified Job.
+     *
+     * @param deploymentId
+     *            the id of the specific deployment
+     * @param jobSpec
+     *            Job Specification
+     * @throws Exception
+     */
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
+
+    /**
+     * Start the specified Job.
+     *
+     * @param deploymentId
+     *            the id of the specific deployment
+     * @param jobSpec
+     *            Job Specification
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    /**
+     * Start the specified Job.
+     *
+     * @param deploymentId
+     *            the id of the specific deployment
+     * @param acggf
+     *            Activity Cluster Graph Generator Factory
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception;
+
+    /**
      * Distribute the specified Job.
      *
      * @param jobSpec
@@ -118,28 +178,6 @@
     void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     /**
-     * Used to run a deployed Job Spec by id
-     *
-     * @param deployedJobSpecId
-     *            The id of the deployed job spec
-     * @param jobParameters
-     *            The serialized job parameters
-     * @throws Exception
-     */
-    JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
-
-    /**
-     * Start the specified Job.
-     *
-     * @param acggf
-     *            Activity Cluster Graph Generator Factory
-     * @param jobFlags
-     *            Flags
-     * @throws Exception
-     */
-    JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
-
-    /**
      * Gets the IP Address and port for the ResultDirectoryService wrapped in NetworkAddress
      *
      * @return {@link NetworkAddress}
@@ -196,44 +234,6 @@
     void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
     /**
-     * Start the specified Job.
-     *
-     * @param deploymentId
-     *            the id of the specific deployment
-     * @param jobSpec
-     *            Job Specification
-     * @throws Exception
-     */
-    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
-
-    /**
-     * Start the specified Job.
-     *
-     * @param deploymentId
-     *            the id of the specific deployment
-     * @param jobSpec
-     *            Job Specification
-     * @param jobFlags
-     *            Flags
-     * @throws Exception
-     */
-    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
-
-    /**
-     * Start the specified Job.
-     *
-     * @param deploymentId
-     *            the id of the specific deployment
-     * @param acggf
-     *            Activity Cluster Graph Generator Factory
-     * @param jobFlags
-     *            Flags
-     * @throws Exception
-     */
-    JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
-            throws Exception;
-
-    /**
      * Shuts down all NCs and then the CC.
      *
      * @param terminateNCService
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index be47014..f63e0ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -42,6 +42,8 @@
 
     public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
 
+    public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
     public void cancelJob(JobId jobId) throws Exception;
 
     public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
@@ -64,8 +66,6 @@
 
     public void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
-    public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
-
     public JobInfo getJobInfo(JobId jobId) throws Exception;
 
     public void stopCluster(boolean terminateNCService) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e46c0ef..766fb26 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -156,6 +156,7 @@
     ILLEGAL_STATE(126),
     INVALID_STRING_UNICODE(127),
     UNSUPPORTED_WRITE_SPEC(128),
+    JOB_REJECTED(129),
 
     // Compilation error codes.
     RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 7225cd4..93848a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -20,5 +20,6 @@
 
 public enum JobFlag {
     PROFILE_RUNTIME,
-    ENFORCE_CONTRACT
+    ENFORCE_CONTRACT,
+    ENSURE_RUNNABLE
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index b18bcb1..0bfdb37 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -19,6 +19,10 @@
 
 package org.apache.hyracks.api.job.resource;
 
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class DefaultJobCapacityController implements IJobCapacityController {
@@ -34,7 +38,7 @@
     }
 
     @Override
-    public JobSubmissionStatus allocate(JobSpecification job) {
+    public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) {
         return JobSubmissionStatus.EXECUTE;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index f88baa2..f2c03f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -19,7 +19,11 @@
 
 package org.apache.hyracks.api.job.resource;
 
+import java.util.Set;
+
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 /**
@@ -41,13 +45,14 @@
      * Allocates required cluster capacity for a job.
      *
      * @param job,
-     *            the job specification.
-     * @return EXECUTE, if the job can be executed immediately;
-     *         QUEUE, if the job cannot be executed
+     *         the job specification.
+     * @param jobId
+     *         the job id.
+     * @return EXECUTE, if the job can be executed immediately; QUEUE, if the job cannot be executed
      * @throws HyracksException
-     *             if the job's capacity requirement exceeds the maximum capacity of the cluster.
+     *         if the job's capacity requirement exceeds the maximum capacity of the cluster.
      */
-    JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+    JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) throws HyracksException;
 
     /**
      * Releases cluster capacity for a job when it completes.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index b3c2d7b..fa52bc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -146,6 +146,7 @@
 126 = Illegal state. %1$s
 127 = Decoding error - %1$s
 128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
+129 = Job %1$s not run. Cluster is not accepting jobs
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index efd42c9..22e4bba 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.cc;
 
 import java.util.Arrays;
+import java.util.Set;
 
 import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -26,6 +27,7 @@
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
@@ -125,4 +127,9 @@
             // no op
         };
     }
+
+    @Override
+    public boolean acceptingJobs(Set<JobFlag> flags) {
+        return true;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index be3daae..7c7111f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -123,7 +123,7 @@
         JobSpecification job = jobRun.getJobSpecification();
         IJobCapacityController.JobSubmissionStatus status;
         try {
-            status = jobCapacityController.allocate(job);
+            status = jobCapacityController.allocate(job, jobRun.getJobId(), jobRun.getFlags());
             CCServiceContext serviceCtx = ccs.getContext();
             serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
             switch (status) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 38277c2..d003853 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -90,7 +90,8 @@
             // Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected
             // or not.
             try {
-                IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
+                IJobCapacityController.JobSubmissionStatus status =
+                        jobCapacityController.allocate(job, run.getJobId(), run.getFlags());
                 // Checks if the job can be executed immediately.
                 if (status == IJobCapacityController.JobSubmissionStatus.EXECUTE) {
                     jobRuns.add(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 19340d0..3e87bb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -30,12 +30,14 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
@@ -57,6 +59,8 @@
 
 public class JobManagerTest {
 
+    private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
     private CCConfig ccConfig;
 
     @Before
@@ -77,7 +81,8 @@
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
             acceptedRuns.add(run);
@@ -93,7 +98,8 @@
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
@@ -109,7 +115,8 @@
             JobRun run = mockJobRun(8193);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
             jobManager.add(run);
         } catch (HyracksException e) {
@@ -149,7 +156,7 @@
             JobRun run = mockJobRun(1);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            when(jobCapacityController.allocate(job))
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
                     .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
             jobManager.add(run);
         } catch (HyracksException e) {
@@ -172,14 +179,16 @@
         JobRun run1 = mockJobRun(1);
         JobSpecification job1 = mock(JobSpecification.class);
         when(run1.getJobSpecification()).thenReturn(job1);
-        when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        when(jobCapacityController.allocate(job1, run1.getJobId(), none))
+                .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
         jobManager.add(run1);
 
         // A failure run.
         JobRun run2 = mockJobRun(2);
         JobSpecification job2 = mock(JobSpecification.class);
         when(run2.getJobSpecification()).thenReturn(job2);
-        when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+        when(jobCapacityController.allocate(job2, run2.getJobId(), none))
+                .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                 .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
         jobManager.add(run2);
 
@@ -220,7 +229,8 @@
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
             acceptedRuns.add(run);
@@ -236,7 +246,8 @@
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 7a75a0f..97e1533 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -247,7 +248,8 @@
                 private long maxRAM = Runtime.getRuntime().maxMemory();
 
                 @Override
-                public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+                public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags)
+                        throws HyracksException {
                     return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
                             ? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE;
                 }
