ASTERIXDB-1747 Implemented full lifecycle capabilities for pre-distributed jobs

Added distribute and destroy functionality
Removed serialization and bytes when running pred-distributed jobs
Cleaned up methods
Enabled Mockito testing for CCS and NCS
Added Unit Test for Distributed Jobs

Change-Id: I59c3422d5c1ab7756a6a4685ac527dfe50434954
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1377
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index 5ff02c7..0d1d8ab 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -35,13 +35,13 @@
     private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName());
     private static final boolean DEBUG = false;
     private final LinkedBlockingQueue<ActiveEvent> eventInbox;
-    private final Map<EntityId, IActiveEntityEventsListener> entityEventListener;
+    private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners;
     private final Map<JobId, ActiveJob> jobId2ActiveJobInfos;
 
     private ActiveJobNotificationHandler() {
         this.eventInbox = new LinkedBlockingQueue<>();
         this.jobId2ActiveJobInfos = new HashMap<>();
-        this.entityEventListener = new HashMap<>();
+        this.entityEventListeners = new HashMap<>();
     }
 
     @Override
@@ -53,15 +53,14 @@
                 ActiveEvent event = getEventInbox().take();
                 ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId());
                 EntityId entityId = jobInfo.getEntityId();
-                IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+                IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
                 if (DEBUG) {
                     LOGGER.log(Level.WARNING, "Next event is of type " + event.getEventKind());
                     LOGGER.log(Level.WARNING, "Notifying the listener");
                 }
                 listener.notify(event);
                 if (event.getEventKind() == EventKind.JOB_FINISH) {
-                    removeFinishedJob(event.getJobId());
-                    removeInactiveListener(listener);
+                    removeJob(event.getJobId(), listener);
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -72,11 +71,18 @@
         LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
     }
 
-    private void removeFinishedJob(JobId jobId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "Removing the job");
+    public void removeJob(JobId jobId, IActiveEntityEventsListener listener) {
+        removeFinishedJob(jobId, listener);
+        removeInactiveListener(listener);
+    }
+
+    private void removeFinishedJob(JobId jobId, IActiveEntityEventsListener listener) {
+        if (!listener.isEntityActive()) {
+            if (DEBUG) {
+                LOGGER.log(Level.WARNING, "Remove job" + jobId);
+            }
+            jobId2ActiveJobInfos.remove(jobId);
         }
-        jobId2ActiveJobInfos.remove(jobId);
     }
 
     private void removeInactiveListener(IActiveEntityEventsListener listener) {
@@ -84,17 +90,17 @@
             if (DEBUG) {
                 LOGGER.log(Level.WARNING, "Removing the listener since it is not active anymore");
             }
-            entityEventListener.remove(listener.getEntityId());
+            entityEventListeners.remove(listener.getEntityId());
         }
     }
 
     public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
-            IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+            IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
             LOGGER.log(Level.WARNING, "Listener found: " + listener);
         }
-        return entityEventListener.get(entityId);
+        return entityEventListeners.get(entityId);
     }
 
     public synchronized ActiveJob[] getActiveJobs() {
@@ -141,7 +147,7 @@
         ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
         if (jobInfo != null) {
             EntityId entityId = jobInfo.getEntityId();
-            IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+            IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
             listener.notifyJobCreation(jobId, jobSpecification);
             if (DEBUG) {
                 LOGGER.log(Level.WARNING, "Listener was notified" + jobId);
@@ -161,9 +167,9 @@
     public synchronized IActiveEntityEventsListener[] getEventListeners() {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getEventListeners() was called");
-            LOGGER.log(Level.WARNING, "returning " + entityEventListener.size() + " Listeners");
+            LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners");
         }
-        return entityEventListener.values().toArray(new IActiveEntityEventsListener[entityEventListener.size()]);
+        return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]);
     }
 
     public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
@@ -172,11 +178,11 @@
                     "registerListener(IActiveEntityEventsListener listener) was called for the entity "
                             + listener.getEntityId());
         }
-        if (entityEventListener.containsKey(listener.getEntityId())) {
+        if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new HyracksDataException(
                     "Active Entity Listener " + listener.getEntityId() + " is already registered");
         }
-        entityEventListener.put(listener.getEntityId(), listener);
+        entityEventListeners.put(listener.getEntityId(), listener);
     }
 
     public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
@@ -185,7 +191,7 @@
             boolean found = jobId2ActiveJobInfos.get(jobId) != null;
             LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
         }
-        if (entityEventListener.containsKey(activeJob.getEntityId())) {
+        if (entityEventListeners.containsKey(activeJob.getEntityId())) {
             if (jobId2ActiveJobInfos.containsKey(jobId)) {
                 LOGGER.severe("Job is already being monitored for job: " + jobId);
                 return;
@@ -205,7 +211,7 @@
                     "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
                             + listener.getEntityId());
         }
-        IActiveEntityEventsListener registeredListener = entityEventListener.remove(listener.getEntityId());
+        IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
         if (registeredListener == null) {
             throw new HyracksDataException(
                     "Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
index 06e9ad1..fad30fa 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
@@ -26,9 +26,9 @@
 
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
 
 public class ActiveLifecycleListener implements IJobLifecycleListener {
 
@@ -65,8 +65,8 @@
     }
 
     @Override
-    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
-        ActiveJobNotificationHandler.INSTANCE.notifyJobCreation(jobId, acggf.getJobSpecification());
+    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+        ActiveJobNotificationHandler.INSTANCE.notifyJobCreation(jobId, spec);
     }
 
     public void receive(ActivePartitionMessage message) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3c69d83..978c2eb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -233,6 +233,10 @@
         this.executorService = executorService;
     }
 
+    public SessionConfig getSessionConfig() {
+        return sessionConfig;
+    }
+
     protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
         List<FunctionDecl> functionDecls = new ArrayList<>();
         for (Statement st : statements) {
@@ -343,7 +347,7 @@
                         handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false);
                         break;
                     case Statement.Kind.DELETE:
-                        handleDeleteStatement(metadataProvider, stmt, hcc);
+                        handleDeleteStatement(metadataProvider, stmt, hcc, false);
                         break;
                     case Statement.Kind.CREATE_PRIMARY_FEED:
                     case Statement.Kind.CREATE_SECONDARY_FEED:
@@ -1403,7 +1407,7 @@
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName) && listener.isEntityActive()) {
                     throw new CompilationException(
                             "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
                 }
@@ -1824,7 +1828,7 @@
         }
     }
 
-    public void handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
+    public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
             IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
 
@@ -1856,7 +1860,7 @@
                 final JobSpecification jobSpec = rewriteCompileInsertUpsert(hcc, metadataProvider, stmtInsertUpsert);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
-                return compileOnly ? null : jobSpec;
+                return jobSpec;
             } catch (Exception e) {
                 if (bActiveTxn) {
                     abort(e, e, mdTxnCtx);
@@ -1864,6 +1868,9 @@
                 throw e;
             }
         };
+        if (compileOnly) {
+            return compiler.compile();
+        }
 
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats);
@@ -1872,17 +1879,18 @@
             try {
                 final JobSpecification jobSpec = compiler.compile();
                 if (jobSpec == null) {
-                    return;
+                    return jobSpec;
                 }
                 JobUtils.runJob(hcc, jobSpec, true);
             } finally {
                 locker.unlock();
             }
         }
+        return null;
     }
 
-    public void handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
-            throws Exception {
+    public JobSpecification handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc, boolean compileOnly) throws Exception {
 
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
         String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
@@ -1903,9 +1911,10 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            if (jobSpec != null) {
+            if (jobSpec != null && !compileOnly) {
                 JobUtils.runJob(hcc, jobSpec, true);
             }
+            return jobSpec;
 
         } catch (Exception e) {
             if (bActiveTxn) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 55cd304..d55fde5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -278,7 +278,7 @@
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
+                if (listener.isEntityActive() && listener.isEntityUsingDataset(dataverseName, datasetName)) {
                     throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
                             RecordUtil.toFullyQualifiedName(dataverseName, datasetName),
                             listener.getEntityId().toString());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 780e205..aa292f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -36,6 +36,8 @@
         GET_JOB_STATUS,
         GET_JOB_INFO,
         START_JOB,
+        DISTRIBUTE_JOB,
+        DESTROY_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
         GET_DATASET_RESULT_LOCATIONS,
@@ -101,6 +103,44 @@
         }
     }
 
+    public static class DistributeJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final byte[] acggfBytes;
+
+        public DistributeJobFunction(byte[] acggfBytes) {
+            this.acggfBytes = acggfBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DISTRIBUTE_JOB;
+        }
+
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
+        }
+    }
+
+    public static class DestroyJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public DestroyJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class StartJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -116,8 +156,8 @@
             this.jobId = jobId;
         }
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
-            this(null, acggfBytes, jobFlags, jobId);
+        public StartJobFunction(JobId jobId) {
+            this(null, null, null, jobId);
         }
 
         public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index c049007..8e7affb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,9 +69,9 @@
     }
 
     @Override
-    public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception {
+    public JobId startJob(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId);
+                new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -83,6 +83,20 @@
     }
 
     @Override
+    public JobId distributeJob(byte[] acggfBytes) throws Exception {
+        HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
+                new HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public JobId destroyJob(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
+                new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
         HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf =
                 new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 008a640..5da1f34 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -103,19 +103,28 @@
     }
 
     @Override
-    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception {
+    public JobId distributeJob(JobSpecification jobSpec) throws Exception {
         JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
                 new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
-        return startJob(jsacggf, jobFlags, jobId);
+        return distributeJob(jsacggf);
+    }
+
+    @Override
+    public JobId destroyJob(JobId jobId) throws Exception {
+        return hci.destroyJob(jobId);
+    }
+
+    @Override
+    public JobId startJob(JobId jobId) throws Exception {
+        return hci.startJob(jobId);
     }
 
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
-    public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags, JobId jobId)
-            throws Exception {
-        return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags, jobId);
+    public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
+        return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
     }
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index c4eba3d..e65cacd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -77,17 +77,33 @@
     public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
-     * Start the specified Job.
+     * Distribute the specified Job.
      *
      * @param jobSpec
      *            Job Specification
      * @param jobFlags
      *            Flags
-     * @param jobId
-     *            Used to run a pre-distributed job by id (the same value will be returned)
      * @throws Exception
      */
-    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception;
+    public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+
+    /**
+     * Destroy the distributed graph for a pre-distributed job
+     *
+     * @param jobId
+     *            The id of the predistributed job
+     * @throws Exception
+     */
+    public JobId destroyJob(JobId jobId) throws Exception;
+
+    /**
+     * Used to run a pre-distributed job by id (the same JobId will be returned)
+     *
+     * @param jobId
+     *            The id of the predistributed job
+     * @throws Exception
+     */
+    public JobId startJob(JobId jobId) throws Exception;
 
     /**
      * Start the specified Job.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 39063c6..f7995d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,7 +38,11 @@
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 
-    public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception;
+    public JobId startJob(JobId jobId) throws Exception;
+
+    public JobId distributeJob(byte[] acggfBytes) throws Exception;
+
+    public JobId destroyJob(JobId jobId) throws Exception;
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8b24cc2..3d99cdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -55,6 +55,9 @@
     public static final int INCONSISTENT_RESULT_METADATA = 18;
     public static final int CANNOT_TRUNCATE_OR_DELETE_FILE = 19;
     public static final int NOT_A_JOBID = 20;
+    public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
+    public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
+    public static final int DISTRIBUTED_JOB_FAILURE = 23;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index cca4a13..30ffebe 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,7 +21,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 
 public interface IJobLifecycleListener {
-    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException;
+    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
 
     public void notifyJobStart(JobId jobId) throws HyracksException;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 1656c51..a33c6c9a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,6 +19,5 @@
 package org.apache.hyracks.api.job;
 
 public enum JobFlag {
-    PROFILE_RUNTIME,
-    STORE_JOB
+    PROFILE_RUNTIME
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 2abca66..7f90c35 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -39,5 +39,8 @@
 18 = Inconsistent metadata for result set %1$s"
 19 = Can't truncate or delete the file: %1$s
 20 = '%1$s' is not a valid job id.
+21 = The distributed job %1$s was not found
+22 = The distributed job %1$s already exists
+23 = The distributed work failed for %1$s at %2$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 7ea5f70..265d3ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -27,10 +27,11 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.DestroyJobWork;
+import org.apache.hyracks.control.cc.work.DistributeJobWork;
 import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobStatusWork;
@@ -81,18 +82,34 @@
                 ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(),
                         new IPCResponder<JobInfo>(handle, mid)));
                 break;
+            case DISTRIBUTE_JOB:
+                HyracksClientInterfaceFunctions.DistributeJobFunction djf =
+                        (HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
+                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory.create(),
+                        new IPCResponder<JobId>(handle, mid)));
+                break;
+            case DESTROY_JOB:
+                HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
+                        (HyracksClientInterfaceFunctions.DestroyJobFunction) fn;
+                ccs.getWorkQueue()
+                        .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid)));
+                break;
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                 JobId jobId = sjf.getJobId();
                 byte[] acggfBytes = null;
+                boolean predistributed = false;
                 if (jobId == null) {
+                    //The job is new
                     jobId = jobIdFactory.create();
+                    acggfBytes = sjf.getACGGFBytes();
+                } else {
+                    //The job has been predistributed. We don't need to send an ActivityClusterGraph
+                    predistributed = true;
                 }
-                //TODO: only send these when the jobId is null
-                acggfBytes = sjf.getACGGFBytes();
                 ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
-                        jobId, new IPCResponder<JobId>(handle, mid)));
+                        jobId, new IPCResponder<JobId>(handle, mid), predistributed));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
                 ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 21fcf92..53d7620 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -23,6 +23,7 @@
 
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.cc.work.DistributedJobFailureWork;
 import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -99,6 +100,11 @@
                 ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(),
                         ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
                 break;
+            case DISTRIBUTED_JOB_FAILURE:
+                CCNCFunctions.ReportDistributedJobFailureFunction rdjf =
+                        (CCNCFunctions.ReportDistributedJobFailureFunction) fn;
+                ccs.getWorkQueue().schedule(new DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId()));
+                break;
             case REGISTER_PARTITION_PROVIDER:
                 CCNCFunctions.RegisterPartitionProviderFunction rppf =
                         (CCNCFunctions.RegisterPartitionProviderFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 37c4177..346f934 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
@@ -90,14 +91,16 @@
 
     private final LogFile jobLog;
 
-    private final ServerContext serverCtx;
+    private ServerContext serverCtx;
 
-    private final WebServer webServer;
+    private WebServer webServer;
 
     private ClusterControllerInfo info;
 
     private CCApplicationContext appCtx;
 
+    private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore();
+
     private final WorkQueue workQueue;
 
     private ExecutorService executor;
@@ -130,14 +133,6 @@
         this.ccConfig = ccConfig;
         File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
         jobLog = new LogFile(jobLogFolder);
-        serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
-        IIPCI ccIPCI = new ClusterControllerIPCI(this);
-        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
-                new CCNCFunctions.SerializerDeserializer());
-        IIPCI ciIPCI = new ClientInterfaceIPCI(this);
-        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
-                new JavaSerializationBasedPayloadSerializerDeserializer());
-        webServer = new WebServer(this);
 
         // WorkQueue is in charge of heartbeat as well as other events.
         workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
@@ -171,6 +166,14 @@
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
+        serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
+        IIPCI ccIPCI = new ClusterControllerIPCI(this);
+        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+                new CCNCFunctions.SerializerDeserializer());
+        IIPCI ciIPCI = new ClientInterfaceIPCI(this);
+        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
+        webServer = new WebServer(this);
         clusterIPC.start();
         clientIPC.start();
         webServer.setPort(ccConfig.httpPort);
@@ -313,6 +316,10 @@
         return nodeManager;
     }
 
+    public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException {
+        return preDistributedJobStore;
+    }
+
     public IResourceManager getResourceManager() {
         return resourceManager;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
new file mode 100644
index 0000000..c573ae8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class PreDistributedJobStore {
+
+    private final Map<JobId, PreDistributedJobDescriptor> preDistributedJobDescriptorMap;
+
+    public PreDistributedJobStore() {
+        preDistributedJobDescriptorMap = new Hashtable<>();
+    }
+
+    public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph activityClusterGraph,
+            JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
+                    throws HyracksException {
+        if (preDistributedJobDescriptorMap.get(jobId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+        }
+        PreDistributedJobDescriptor descriptor =
+                new PreDistributedJobDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
+        preDistributedJobDescriptorMap.put(jobId, descriptor);
+    }
+
+    public void checkForExistingDistributedJobDescriptor(JobId jobId) throws HyracksException {
+        if (preDistributedJobDescriptorMap.get(jobId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+        }
+    }
+
+    public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId jobId) throws HyracksException {
+        PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
+        if (descriptor == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+        }
+        return descriptor;
+    }
+
+    public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException {
+        PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
+        if (descriptor == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+        }
+        preDistributedJobDescriptorMap.remove(jobId);
+    }
+
+    public class PreDistributedJobDescriptor {
+
+        private final ActivityClusterGraph activityClusterGraph;
+
+        private final JobSpecification jobSpecification;
+
+        private final Set<Constraint> activityClusterGraphConstraints;
+
+        private PreDistributedJobDescriptor(ActivityClusterGraph activityClusterGraph,
+                JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
+            this.activityClusterGraph = activityClusterGraph;
+            this.jobSpecification = jobSpecification;
+            this.activityClusterGraphConstraints = activityClusterGraphConstraints;
+        }
+
+        public ActivityClusterGraph getActivityClusterGraph() {
+            return activityClusterGraph;
+        }
+
+        public JobSpecification getJobSpecification() {
+            return jobSpecification;
+        }
+
+        public Set<Constraint> getActivityClusterGraphConstraints() {
+            return activityClusterGraphConstraints;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index e43a59d..77b9b17 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -32,9 +32,9 @@
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ApplicationContext;
@@ -93,10 +93,10 @@
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec)
             throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobCreation(jobId, acggf);
+            l.notifyJobCreation(jobId, spec);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 46a173e..c4cf38d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -38,8 +38,8 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
@@ -72,7 +72,7 @@
     }
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec)
             throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(getClass().getSimpleName() + " notified of new job " + jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3eece52..8f7b0cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -31,8 +31,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -54,7 +52,9 @@
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
+import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.Task;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
@@ -65,6 +65,7 @@
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 
+
 public class JobExecutor {
     private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
 
@@ -74,15 +75,19 @@
 
     private final PartitionConstraintSolver solver;
 
+    private final boolean predistributed;
+
     private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
 
     private final Set<TaskCluster> inProgressTaskClusters;
 
     private final Random random;
 
-    public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
+    public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints,
+            boolean predistributed) {
         this.ccs = ccs;
         this.jobRun = jobRun;
+        this.predistributed = predistributed;
         solver = new PartitionConstraintSolver();
         partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
         inProgressTaskClusters = new HashSet<TaskCluster>();
@@ -90,6 +95,10 @@
         random = new Random();
     }
 
+    public boolean isPredistributed() {
+        return predistributed;
+    }
+
     public JobRun getJobRun() {
         return jobRun;
     }
@@ -475,7 +484,7 @@
                 jobRun.getConnectorPolicyMap());
         INodeManager nodeManager = ccs.getNodeManager();
         try {
-            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+            byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg);
             for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
                 String nodeId = entry.getKey();
                 final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index e3f9557..741e3db 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -35,7 +35,6 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
@@ -105,7 +104,7 @@
     @Override
     public void add(JobRun jobRun) throws HyracksException {
         checkJob(jobRun);
-        JobSpecification job = jobRun.getActivityClusterGraphFactory().getJobSpecification();
+        JobSpecification job = jobRun.getJobSpecification();
         IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
         switch (status) {
             case QUEUE:
@@ -214,7 +213,7 @@
         }
 
         // Releases cluster capacitys occupied by the job.
-        JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+        JobSpecification job = run.getJobSpecification();
         jobCapacityController.release(job);
 
         // Picks the next job to execute.
@@ -273,8 +272,10 @@
             activeRunMap.put(jobId, run);
 
             CCApplicationContext appCtx = ccs.getApplicationContext();
-            IActivityClusterGraphGeneratorFactory acggf = run.getActivityClusterGraphFactory();
-            appCtx.notifyJobCreation(jobId, acggf);
+            JobSpecification spec = run.getJobSpecification();
+            if (!run.getExecutor().isPredistributed()) {
+                appCtx.notifyJobCreation(jobId, spec);
+            }
             run.setStatus(JobStatus.RUNNING, null);
             executeJobInternal(run);
             callback.setValue(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 5682194..3aa9043 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,12 +20,14 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -40,9 +42,11 @@
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.PreDistributedJobStore.PreDistributedJobDescriptor;
 import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
 import org.apache.hyracks.control.cc.executor.JobExecutor;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
@@ -59,13 +63,11 @@
 
     private final JobId jobId;
 
-    private final IActivityClusterGraphGeneratorFactory acggf;
-
-    private final IActivityClusterGraphGenerator acgg;
+    private final JobSpecification spec;
 
     private final ActivityClusterGraph acg;
 
-    private final JobExecutor scheduler;
+    private JobExecutor scheduler;
 
     private final Set<JobFlag> jobFlags;
 
@@ -99,17 +101,13 @@
 
     private final IResultCallback<JobId> callback;
 
-    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
-            IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
-            IResultCallback<JobId> callback) {
+    private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, IResultCallback<JobId> callback,
+            JobSpecification spec, ActivityClusterGraph acg) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
-        this.acggf = acggf;
-        this.acgg = acgg;
-        this.acg = acgg.initialize();
-        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints());
         this.jobFlags = jobFlags;
-        this.callback = callback;
+        this.spec = spec;
+        this.acg = acg;
         activityClusterPlanMap = new HashMap<>();
         pmm = new PartitionMatchMaker();
         participatingNodeIds = new HashSet<>();
@@ -118,18 +116,37 @@
         connectorPolicyMap = new HashMap<>();
         operatorLocations = new HashMap<>();
         createTime = System.currentTimeMillis();
+        this.callback = callback;
+    }
+
+    //Run a Pre-distributed job by passing the JobId
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, IResultCallback<JobId> callback,
+            PreDistributedJobDescriptor distributedJobDescriptor)
+            throws HyracksException {
+        this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), callback,
+                distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
+        Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
+        this.scheduler = new JobExecutor(ccs, this, constaints, true);
+    }
+
+    //Run a new job by creating an ActivityClusterGraph
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+            IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
+            IResultCallback<JobId> callback) {
+        this(deploymentId, jobId, jobFlags, callback, acggf.getJobSpecification(), acgg.initialize());
+        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false);
     }
 
     public DeploymentId getDeploymentId() {
         return deploymentId;
     }
 
-    public JobId getJobId() {
-        return jobId;
+    public JobSpecification getJobSpecification() {
+        return spec;
     }
 
-    public IActivityClusterGraphGeneratorFactory getActivityClusterGraphFactory() {
-        return acggf;
+    public JobId getJobId() {
+        return jobId;
     }
 
     public ActivityClusterGraph getActivityClusterGraph() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index eac9800..6cf75bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -68,7 +68,7 @@
         Iterator<JobRun> runIterator = jobQueue.iterator();
         while (runIterator.hasNext()) {
             JobRun run = runIterator.next();
-            JobSpecification job = run.getActivityClusterGraphFactory().getJobSpecification();
+            JobSpecification job = run.getJobSpecification();
             // Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected
             // or not.
             try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
new file mode 100644
index 0000000..df98252
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DestroyJobWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final JobId jobId;
+    private final IResultCallback<JobId> callback;
+
+    public DestroyJobWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobId> callback) {
+        this.jobId = jobId;
+        this.ccs = ccs;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId);
+            INodeManager nodeManager = ccs.getNodeManager();
+            for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+                node.getNodeController().destroyJob(jobId);
+            }
+            callback.setValue(jobId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
new file mode 100644
index 0000000..f0c3303
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
+import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DistributeJobWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final byte[] acggfBytes;
+    private final JobId jobId;
+    private final IResultCallback<JobId> callback;
+
+    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobId jobId,
+            IResultCallback<JobId> callback) {
+        this.jobId = jobId;
+        this.ccs = ccs;
+        this.acggfBytes = acggfBytes;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            final CCApplicationContext appCtx = ccs.getApplicationContext();
+            ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
+            IActivityClusterGraphGeneratorFactory acggf =
+                    (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, appCtx);
+            IActivityClusterGraphGenerator acgg =
+                    acggf.createActivityClusterGraphGenerator(jobId, appCtx, EnumSet.noneOf(JobFlag.class));
+            ActivityClusterGraph acg = acgg.initialize();
+            ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(),
+                    acgg.getConstraints());
+
+            appCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
+
+            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+
+            INodeManager nodeManager = ccs.getNodeManager();
+            for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
+                node.getNodeController().distributeJob(jobId, acgBytes);
+            }
+
+            callback.setValue(jobId);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
new file mode 100644
index 0000000..f7fa2a4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+public class DistributedJobFailureWork extends SynchronizableWork {
+    protected final JobId jobId;
+    protected final String nodeId;
+
+    public DistributedJobFailureWork(JobId jobId, String nodeId) {
+        this.jobId = jobId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void doRun() throws HyracksException {
+        throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, jobId, nodeId);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index fefd3b6..c608712 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -40,15 +40,17 @@
     private final DeploymentId deploymentId;
     private final JobId jobId;
     private final IResultCallback<JobId> callback;
+    private final boolean predestributed;
 
     public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
-            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback) {
+            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.jobFlags = jobFlags;
         this.callback = callback;
+        this.predestributed = predestributed;
     }
 
     @Override
@@ -56,11 +58,21 @@
         IJobManager jobManager = ccs.getJobManager();
         try {
             final CCApplicationContext appCtx = ccs.getApplicationContext();
-            IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
-                    .deserialize(acggfBytes, deploymentId, appCtx);
-            IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
-            JobRun run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback);
+            JobRun run;
+            if (!predestributed) {
+                //Need to create the ActivityClusterGraph
+                IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
+                        .deserialize(acggfBytes, deploymentId, appCtx);
+                IActivityClusterGraphGenerator acgg =
+                        acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
+                run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback);
+            } else {
+                //ActivityClusterGraph has already been distributed
+                run = new JobRun(ccs, deploymentId, jobId, callback,
+                        ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+            }
             jobManager.add(run);
+
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 5e1b856..88b8939 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -65,7 +65,7 @@
             // Mocks an immediately executable job.
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
-            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(run.getJobSpecification()).thenReturn(job);
             when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
@@ -81,7 +81,7 @@
             // Mocks a deferred job.
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
-            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(run.getJobSpecification()).thenReturn(job);
             when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
@@ -97,7 +97,7 @@
         try {
             JobRun run = mockJobRun(8193);
             JobSpecification job = mock(JobSpecification.class);
-            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(run.getJobSpecification()).thenReturn(job);
             when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
             jobManager.add(run);
@@ -138,7 +138,7 @@
         try {
             JobRun run = mockJobRun(1);
             JobSpecification job = mock(JobSpecification.class);
-            when(run.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job);
+            when(run.getJobSpecification()).thenReturn(job);
             when(jobCapacityController.allocate(job))
                     .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
             jobManager.add(run);
@@ -162,14 +162,14 @@
         // A normal run.
         JobRun run1 = mockJobRun(1);
         JobSpecification job1 = mock(JobSpecification.class);
-        when(run1.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job1);
+        when(run1.getJobSpecification()).thenReturn(job1);
         when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
         jobManager.add(run1);
 
         // A failure run.
         JobRun run2 = mockJobRun(2);
         JobSpecification job2 = mock(JobSpecification.class);
-        when(run2.getActivityClusterGraphFactory().getJobSpecification()).thenReturn(job2);
+        when(run2.getJobSpecification()).thenReturn(job2);
         when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                 .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0"));
         jobManager.add(run2);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a0c0f95..4159594 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -44,6 +44,8 @@
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
             throws Exception;
 
+    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception;
+
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
 
     public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 5c27a6f..a10f8f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -50,6 +50,10 @@
 
     public void undeployBinary(DeploymentId deploymentId) throws Exception;
 
+    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+
+    public void destroyJob(JobId jobId) throws Exception;
+
     public void dumpState(String stateDumpId) throws Exception;
 
     public void shutdown(boolean terminateNCService) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4ee34ca..4eb1732 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -100,6 +100,10 @@
         SHUTDOWN_REQUEST,
         SHUTDOWN_RESPONSE,
 
+        DISTRIBUTE_JOB,
+        DESTROY_JOB,
+        DISTRIBUTED_JOB_FAILURE,
+
         STATE_DUMP_REQUEST,
         STATE_DUMP_RESPONSE,
 
@@ -282,6 +286,31 @@
         }
     }
 
+    public static class ReportDistributedJobFailureFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final String nodeId;
+
+        public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) {
+            this.jobId = jobId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DISTRIBUTED_JOB_FAILURE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
     public static class NotifyJobletCleanupFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -670,6 +699,51 @@
         }
     }
 
+    public static class DistributeJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final byte[] acgBytes;
+
+        public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
+            this.jobId = jobId;
+            this.acgBytes = acgBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DISTRIBUTE_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getacgBytes() {
+            return acgBytes;
+        }
+    }
+
+    public static class DestroyJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public DestroyJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class StartTasksFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ac6fc2c..83ef32b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -141,6 +141,13 @@
     }
 
     @Override
+    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception {
+        CCNCFunctions.ReportDistributedJobFailureFunction fn =
+                new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void getNodeControllerInfos() throws Exception {
         ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 0d59b8d..2a8464e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -84,6 +84,18 @@
     }
 
     @Override
+    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
+        CCNCFunctions.DistributeJobFunction fn = new CCNCFunctions.DistributeJobFunction(jobId, planBytes);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void destroyJob(JobId jobId) throws Exception {
+        CCNCFunctions.DestroyJobFunction fn = new CCNCFunctions.DestroyJobFunction(jobId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void dumpState(String stateDumpId) throws Exception {
         CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
         ipcHandle.send(-1, dsf, null);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 93ccaa4..e6278be 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -26,6 +26,8 @@
 import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.DestroyJobWork;
+import org.apache.hyracks.control.nc.work.DistributeJobWork;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -99,6 +101,16 @@
                 ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId()));
                 return;
 
+            case DISTRIBUTE_JOB:
+                CCNCFunctions.DistributeJobFunction djf = (CCNCFunctions.DistributeJobFunction) fn;
+                ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, djf.getJobId(), djf.getacgBytes()));
+                return;
+
+            case DESTROY_JOB:
+                CCNCFunctions.DestroyJobFunction dsjf = (CCNCFunctions.DestroyJobFunction) fn;
+                ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, dsjf.getJobId()));
+                return;
+
             case STATE_DUMP_REQUEST:
                 final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
                 ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId()));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 19f01c1..bf0ddb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -46,6 +46,8 @@
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
@@ -94,11 +96,11 @@
 
     private final IOManager ioManager;
 
-    private final IPCSystem ipc;
+    private IPCSystem ipc;
 
-    private final PartitionManager partitionManager;
+    private PartitionManager partitionManager;
 
-    private final NetworkManager netManager;
+    private NetworkManager netManager;
 
     private IDatasetPartitionManager datasetPartitionManager;
 
@@ -155,18 +157,11 @@
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
-        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
-                new NodeControllerIPCI(this),
-                new CCNCFunctions.SerializerDeserializer());
 
         ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.ioDevices));
         if (id == null) {
             throw new Exception("id not set");
         }
-        partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
-                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
-                FullFrameChannelInterfaceFactory.INSTANCE);
 
         lccm = new LifeCycleComponentManager();
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
@@ -244,7 +239,13 @@
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
+                new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer());
         ipc.start();
+        partitionManager = new PartitionManager(this);
+        netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
+                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
+                FullFrameChannelInterfaceFactory.INSTANCE);
         netManager.start();
 
         startApplication();
@@ -365,8 +366,28 @@
         return jobletMap;
     }
 
-    public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() {
-        return activityClusterGraphMap;
+    public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
+        if (activityClusterGraphMap.get(jobId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+        }
+        activityClusterGraphMap.put(jobId, acg);
+    }
+
+    public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
+        if (activityClusterGraphMap.get(jobId) == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+        }
+        activityClusterGraphMap.remove(jobId);
+    }
+
+    public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
+        if (activityClusterGraphMap.get(jobId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+        }
+    }
+
+    public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
+        return activityClusterGraphMap.get(jobId);
     }
 
     public NetworkManager getNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
new file mode 100644
index 0000000..55dd01e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * destroy a pre-distributed job
+ *
+ */
+public class DestroyJobWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final JobId jobId;
+
+    public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.removeActivityClusterGraph(jobId);
+        } catch (HyracksException e) {
+            try {
+                ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId());
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
+        }
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
new file mode 100644
index 0000000..3a4f6ac
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.nc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+/**
+ * pre-distribute a job that can be executed later
+ *
+ */
+public class DistributeJobWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private final byte[] acgBytes;
+    private final JobId jobId;
+
+    public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes) {
+        this.ncs = ncs;
+        this.jobId = jobId;
+        this.acgBytes = acgBytes;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.checkForDuplicateDistributedJob(jobId);
+            ActivityClusterGraph acg =
+                    (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getApplicationContext());
+            ncs.storeActivityClusterGraph(jobId, acg);
+        } catch (HyracksException e) {
+            try {
+                ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId());
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
+        }
+
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 803f15a..6cd9fa2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -46,6 +46,7 @@
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
@@ -186,17 +187,12 @@
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            Map<JobId, ActivityClusterGraph> acgMap = ncs.getActivityClusterGraphMap();
-            ActivityClusterGraph acg = acgMap.get(jobId);
+            ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId);
             if (acg == null) {
                 if (acgBytes == null) {
-                    throw new HyracksException("Joblet was not found. This job was most likely aborted.");
+                    throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
                 }
                 acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
-                if (flags.contains(JobFlag.STORE_JOB)) {
-                    //TODO: Right now the map is append-only
-                    acgMap.put(jobId, acg);
-                }
             }
             ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index b51a578..a7677f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -63,10 +63,10 @@
     public static final String NC1_ID = "nc1";
     public static final String NC2_ID = "nc2";
 
-    private static ClusterControllerService cc;
+    protected static ClusterControllerService cc;
     protected static NodeControllerService nc1;
     protected static NodeControllerService nc2;
-    private static IHyracksClientConnection hcc;
+    protected static IHyracksClientConnection hcc;
 
     private final List<File> outputFiles;
     private static AtomicInteger aInteger = new AtomicInteger(0);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index efbb9d2..160336a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -114,6 +114,11 @@
 
     @Test
     public void optimizedSortMergeTest02() throws Exception {
+        JobSpecification spec = createSortMergeJobSpec();
+        runTest(spec);
+    }
+
+    public static JobSpecification createSortMergeJobSpec() throws Exception {
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
@@ -156,19 +161,17 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-        spec.connect(
-                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
-                        1, 0 }, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
-                        new IBinaryComparatorFactory[] {
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                new int[] { 1, 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0);
-
-        runTest(spec);
+        return spec;
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
new file mode 100644
index 0000000..2509515
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PredistributedJobsTest {
+    private static final Logger LOGGER = Logger.getLogger(PredistributedJobsTest.class.getName());
+
+    private static final String NC1_ID = "nc1";
+    private static final String NC2_ID = "nc2";
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    @BeforeClass
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = 39000;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = 39001;
+        ccConfig.profileDumpPeriod = 10000;
+        FileUtils.deleteQuietly(new File("target" + File.separator + "data"));
+        FileUtils.copyDirectory(new File("data"), new File("target" + File.separator + "data"));
+        File outDir = new File("target" + File.separator + "ClusterController");
+        outDir.mkdirs();
+        File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
+        ccRoot.delete();
+        ccRoot.mkdir();
+        ccConfig.ccRoot = ccRoot.getAbsolutePath();
+        ClusterControllerService ccBase = new ClusterControllerService(ccConfig);
+        cc = Mockito.spy(ccBase);
+        cc.start();
+
+        NCConfig ncConfig1 = new NCConfig();
+        ncConfig1.ccHost = "localhost";
+        ncConfig1.ccPort = 39001;
+        ncConfig1.clusterNetIPAddress = "127.0.0.1";
+        ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.resultIPAddress = "127.0.0.1";
+        ncConfig1.nodeId = NC1_ID;
+        ncConfig1.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
+                + File.separator + "device0";
+        NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
+        nc1 = Mockito.spy(nc1Base);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig();
+        ncConfig2.ccHost = "localhost";
+        ncConfig2.ccPort = 39001;
+        ncConfig2.clusterNetIPAddress = "127.0.0.1";
+        ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.resultIPAddress = "127.0.0.1";
+        ncConfig2.nodeId = NC2_ID;
+        ncConfig2.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
+                + File.separator + "device1";
+        NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
+        nc2 = Mockito.spy(nc2Base);
+        nc2.start();
+
+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void DistributedTest() throws Exception {
+        JobSpecification spec1 = UnionTest.createUnionJobSpec();
+        JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
+
+        //distribute both jobs
+        JobId jobId1 = hcc.distributeJob(spec1);
+        JobId jobId2 = hcc.distributeJob(spec2);
+
+        //make sure it finished
+        //cc will get the store once to check for duplicate insertion and once to insert per job
+        verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore();
+        verify(nc1, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
+        verify(nc2, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
+        verify(nc1, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+        verify(nc2, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
+
+        //confirm that both jobs are distributed
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && nc2.getActivityClusterGraph(jobId1) != null);
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && nc2.getActivityClusterGraph(jobId2) != null);
+        Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1) != null);
+        Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null);
+
+        //run the first job
+        hcc.startJob(jobId1);
+        hcc.waitForCompletion(jobId1);
+
+        //destroy the first job
+        hcc.destroyJob(jobId1);
+
+        //make sure it finished
+        verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore();
+        verify(nc1, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+        verify(nc2, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
+
+        //confirm the first job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && nc2.getActivityClusterGraph(jobId1) == null);
+        cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
+
+        //run the second job
+        hcc.startJob(jobId2);
+        hcc.waitForCompletion(jobId2);
+
+        //run the second job again
+        hcc.startJob(jobId2);
+        hcc.waitForCompletion(jobId2);
+
+        //destroy the second job
+        hcc.destroyJob(jobId2);
+
+        //make sure it finished
+        verify(cc, Mockito.timeout(5000).times(12)).getPreDistributedJobStore();
+        verify(nc1, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+        verify(nc2, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
+
+        //confirm the second job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && nc2.getActivityClusterGraph(jobId2) == null);
+        cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2);
+    }
+
+    @AfterClass
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 02cab8f..542f037 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -44,6 +44,11 @@
 public class UnionTest extends AbstractIntegrationTest {
     @Test
     public void union01() throws Exception {
+        JobSpecification spec = createUnionJobSpec();
+        runTest(spec);
+    }
+
+    public static JobSpecification createUnionJobSpec() throws Exception {
         JobSpecification spec = new JobSpecification();
 
         IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
@@ -82,6 +87,6 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
 
         spec.addRoot(printer);
-        runTest(spec);
+        return spec;
     }
 }