[ASTERIXDB-1911][HYR,RT,CLUS] Fixes and Improvements for Deployed Jobs

Rename Predistributed Jobs to Deployed Jobs
Enable job executions to have a map of job parameters
Add an Asterix function to retrieve these parameters
which are can be assigned when the job is run, e.g. for Deployed jobs
Allow Deployed jobs to have new TxnIds and JobIds for each execution
Allow simultaneous execution of one Deployed Job

Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2045
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 5865cd1..6b0c381 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,6 +31,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-transactions</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
new file mode 100644
index 0000000..070d838
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * Provides functionality for running DeployedJobSpecs
+ */
+public class DeployedJobService {
+
+    private static final Logger LOGGER = Logger.getLogger(DeployedJobService.class.getName());
+
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
+
+    //pool size one (only running one thread at a time)
+    private static final int POOL_SIZE = 1;
+
+    //Starts running a deployed job specification periodically with an interval of "duration" seconds
+    public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
+            IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId) {
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId)) {
+                        scheduledExecutorService.shutdown();
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+                            + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
+                }
+            }
+        }, duration, duration, TimeUnit.MILLISECONDS);
+        return scheduledExecutorService;
+    }
+
+    public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, long duration, EntityId entityId) throws Exception {
+        long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId);
+        if (executionMilliseconds > duration && LOGGER.isLoggable(Level.SEVERE)) {
+            LOGGER.log(Level.SEVERE,
+                    "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+                            + entityId.getEntityName() + " was unable to meet the required period of " + duration
+                            + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
+                            + new Date());
+            return false;
+        }
+        return true;
+    }
+
+    public synchronized static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, EntityId entityId) throws Exception {
+        JobId jobId;
+        long startTime = Instant.now().toEpochMilli();
+
+        //Add the Asterix Transaction Id to the map
+        jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(TxnIdFactory.create().getId()).getBytes());
+        jobId = hcc.startJob(distributedId, jobParameters);
+
+        hcc.waitForCompletion(jobId);
+        long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
+
+        LOGGER.log(Level.INFO,
+                "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
+                        + "."
+                        + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
+
+        return executionMilliseconds;
+
+    }
+
+    @Override
+    public String toString() {
+        return "DeployedJobSpecService";
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a811454..a52d765 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -744,7 +744,7 @@
         return DatasetUtil.createNodeGroupForNewDataset(dataverseName, datasetName, selectedNodes, metadataProvider);
     }
 
-    protected void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
@@ -1672,9 +1672,9 @@
 
     protected void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-        String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
-        cfs.getSignature().setNamespace(dataverse);
-        String functionName = cfs.getaAterixFunction().getName();
+        String dataverse = getActiveDataverseName(cfs.getFunctionSignature().getNamespace());
+        cfs.getFunctionSignature().setNamespace(dataverse);
+        String functionName = cfs.getFunctionSignature().getName();
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1685,7 +1685,7 @@
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
             }
-            Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(),
+            Function function = new Function(dataverse, functionName, cfs.getFunctionSignature().getArity(),
                     cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
                     rewriterFactory instanceof SqlppRewriterFactory ? Function.LANGUAGE_SQLPP : Function.LANGUAGE_AQL,
                     FunctionKind.SCALAR.toString());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index df8eef7..90b9a1e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -58,6 +58,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -298,6 +299,8 @@
         }
         JobId jobId = newJobId();
         IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        JobEventListenerFactory factory = new JobEventListenerFactory(new TxnId(jobId.getId()), true);
+        Mockito.when(jobletCtx.getJobletEventListenerFactory()).thenReturn(factory);
         Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
         Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
         ctx = Mockito.spy(ctx);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
new file mode 100644
index 0000000..0f37b13
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+
+/**
+ * an interface for JobEventListenerFactories to add Asterix transaction JobId getter
+ */
+public interface IJobEventListenerFactory extends IJobletEventListenerFactory {
+
+    TxnId getTxnId(TxnId compiledTxnId);
+}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index 6d74957..84d66ce 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -34,10 +34,6 @@
     private final boolean ifNotExists;
     private final List<String> paramList;
 
-    public FunctionSignature getaAterixFunction() {
-        return signature;
-    }
-
     public String getFunctionBody() {
         return functionBody;
     }
@@ -66,7 +62,7 @@
         return paramList;
     }
 
-    public FunctionSignature getSignature() {
+    public FunctionSignature getFunctionSignature() {
         return signature;
     }
 
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index a2e7341..c74ee5d 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -232,7 +232,7 @@
             out.print("(");
             exprList.get(0).accept(this, step + 1);
             for (int i = 1; i < exprList.size(); i++) {
-                OperatorType opType = opList.get(i - 1);;
+                OperatorType opType = opList.get(i - 1);
                 if (i == 1) {
                     printHints(operatorExpr.getHints(), step + 1);
                 }
@@ -657,7 +657,7 @@
         out.print(skip(step) + CREATE + " index ");
         out.print(normalize(cis.getIndexName().getValue()) + " ");
         out.print(generateIfNotExists(cis.getIfNotExists()));
-        out.print(" on ");;
+        out.print(" on ");
         out.print(generateFullName(cis.getDataverseName(), cis.getDatasetName()));
 
         out.print(" (");
@@ -827,7 +827,8 @@
     public Void visit(CreateFunctionStatement cfs, Integer step) throws CompilationException {
         out.print(skip(step) + CREATE + " function ");
         out.print(generateIfNotExists(cfs.getIfNotExists()));
-        out.print(this.generateFullName(cfs.getSignature().getNamespace(), cfs.getSignature().getName()));
+        out.print(
+                this.generateFullName(cfs.getFunctionSignature().getNamespace(), cfs.getFunctionSignature().getName()));
         out.print("(");
         printDelimitedStrings(cfs.getParamList(), COMMA);
         out.println(") {");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 07d3c69..e4c4860 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -205,7 +205,6 @@
         return new Pair<>(mergePolicyFactory, properties);
     }
 
-    @SuppressWarnings("unchecked")
     public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType)
             throws HyracksDataException {
         IARecordBuilder propertyRecordBuilder = new RecordBuilder();
@@ -399,7 +398,7 @@
                 metadataProvider.getSplitProviderAndConstraints(dataset);
 
         // prepare callback
-        TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
+        TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(null);
         int[] primaryKeyFields = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index cdb0dc0..e59c600 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -864,6 +864,9 @@
     public static final FunctionIdentifier EXTERNAL_LOOKUP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "external-lookup", FunctionIdentifier.VARARGS);
 
+    public static final FunctionIdentifier GET_JOB_PARAMETER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "get-job-param", 1);
+
     public static final FunctionIdentifier META = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta",
             FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier META_KEY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "meta-key",
@@ -1274,6 +1277,9 @@
         // external lookup
         addPrivateFunction(EXTERNAL_LOOKUP, AnyTypeComputer.INSTANCE, false);
 
+        // get job parameter
+        addFunction(GET_JOB_PARAMETER, AnyTypeComputer.INSTANCE, false);
+
         // unnesting function
         addPrivateFunction(SCAN_COLLECTION, CollectionMemberResultType.INSTANCE, true);
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
index 1f9909c..95b6ef6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java
@@ -22,8 +22,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -57,7 +57,6 @@
         this.funcID = funcID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void evaluate(IFrameTupleReference tuple, IPointable resultPointable) throws HyracksDataException {
         resultStorage.reset();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
new file mode 100644
index 0000000..17f7a96
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterByNameDescriptor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class GetJobParameterByNameDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GetJobParameterByNameDescriptor();
+        }
+    };
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws HyracksDataException {
+                return new AbstractUnaryStringStringEval(ctx, args[0],
+                        GetJobParameterByNameDescriptor.this.getIdentifier()) {
+                    private byte[] result;
+
+                    @Override
+                    protected void process(UTF8StringPointable inputString, IPointable resultPointable)
+                            throws IOException {
+                        result = ctx.getJobParameter(inputString.getByteArray(), inputString.getStartOffset(),
+                                inputString.getLength());
+                    }
+
+                    @Override
+                    void writeResult(IPointable resultPointable) throws IOException {
+                        resultPointable.set(result, 0, result.length);
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.GET_JOB_PARAMETER;
+    }
+
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 5acbeb9..f3eb6c9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -158,6 +158,7 @@
 import org.apache.asterix.runtime.evaluators.functions.FullTextContainsDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.FullTextContainsWithoutOptionDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GetItemDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.GetJobParameterByNameDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.GramTokensDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.HashedGramTokensDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.HashedWordTokensDescriptor;
@@ -437,6 +438,9 @@
         // Inject failure function
         fc.add(InjectFailureDescriptor.FACTORY);
 
+        // Get Job Parameter function
+        fc.add(GetJobParameterByNameDescriptor.FACTORY);
+
         // Switch case
         fc.add(SwitchCaseDescriptor.FACTORY);
 
@@ -737,8 +741,8 @@
      */
     private static IFunctionDescriptorFactory getGeneratedFunctionDescriptorFactory(Class<?> cl) {
         try {
-            String className = CodeGenHelper.getGeneratedClassName(cl.getName(),
-                    CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS);
+            String className =
+                    CodeGenHelper.getGeneratedClassName(cl.getName(), CodeGenHelper.DEFAULT_SUFFIX_FOR_GENERATED_CLASS);
             Class<?> generatedCl = cl.getClassLoader().loadClass(className);
             Field factory = generatedCl.getDeclaredField(FACTORY);
             return (IFunctionDescriptorFactory) factory.get(null);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 1422e42..23ad1e1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
@@ -27,14 +28,19 @@
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.JobStatus;
 
-public class JobEventListenerFactory implements IJobletEventListenerFactory {
+public class JobEventListenerFactory implements IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
-    private final TxnId txnId;
+
+    private TxnId txnId;
     private final boolean transactionalWrite;
 
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
+
     public JobEventListenerFactory(TxnId txnId, boolean transactionalWrite) {
         this.txnId = txnId;
         this.transactionalWrite = transactionalWrite;
@@ -45,6 +51,26 @@
     }
 
     @Override
+    public TxnId getTxnId(TxnId compiledTxnId) {
+        return txnId;
+    }
+
+    @Override
+    public IJobletEventListenerFactory copyFactory() {
+        return new JobEventListenerFactory(txnId, transactionalWrite);
+    }
+
+    @Override
+    public void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore) {
+        String AsterixTransactionIdString =
+                new String(jobParameterByteStore.getParameterValue(TRANSACTION_ID_PARAMETER_NAME, 0,
+                        TRANSACTION_ID_PARAMETER_NAME.length));
+        if (AsterixTransactionIdString.length() > 0) {
+            this.txnId = new TxnId(Integer.parseInt(AsterixTransactionIdString));
+        }
+    }
+
+    @Override
     public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) {
 
         return new IJobletEventListener() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index a63f3ca..23c86f3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
@@ -29,13 +30,14 @@
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.JobStatus;
 
 /**
  * This Joblet enable transactions on multiple datasets to take place in the same Hyracks Job
  * It takes a list of Transaction job ids instead of a single job Id
  */
-public class MultiTransactionJobletEventListenerFactory implements IJobletEventListenerFactory {
+public class MultiTransactionJobletEventListenerFactory implements IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
     private final List<TxnId> txnIds;
@@ -46,6 +48,22 @@
         this.transactionalWrite = transactionalWrite;
     }
 
+    //TODO: Enable this factory to be usable for Deployed Jobs
+    @Override
+    public TxnId getTxnId(TxnId compiledTxnId) {
+        return compiledTxnId;
+    }
+
+    @Override
+    public IJobletEventListenerFactory copyFactory() {
+        return new MultiTransactionJobletEventListenerFactory(txnIds, transactionalWrite);
+    }
+
+    @Override
+    public void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore) {
+        //no op
+    }
+
     @Override
     public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) {
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 73b9b41..3f3dbd9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -28,6 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 
 public class LockThenSearchOperationCallbackFactory extends AbstractOperationCallbackFactory
@@ -45,7 +47,9 @@
             IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             return new LockThenSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx,
                     operatorNodePushable);
         } catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index dbf58e4..93108f9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -29,6 +30,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
@@ -47,7 +49,9 @@
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index b8c6084..fb01952 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
@@ -66,7 +68,9 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 49490a1..076b0d9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
@@ -29,6 +30,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
@@ -47,7 +49,9 @@
             IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 1847e80..5882046 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -62,7 +64,9 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 38adf2b..79ce788 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -63,7 +65,9 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 5e499fc..8a27914 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -32,6 +33,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
@@ -65,7 +67,9 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, indexOp);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index be75ab9..dfd3eb1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -31,6 +32,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -62,7 +64,9 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+            IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
             IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 2e43957..58f7e69 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -19,11 +19,13 @@
 
 package org.apache.asterix.transaction.management.runtime;
 
+import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 
 public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
@@ -55,7 +57,9 @@
 
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
-            return new CommitRuntime(ctx, txnId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+        return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId,
+                primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
+                datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 95479c1..23c41fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -22,10 +22,13 @@
 import java.net.URL;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 
@@ -104,12 +107,12 @@
         }
     }
 
-    public static class DistributeJobFunction extends Function {
+    public static class DeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
 
-        public DistributeJobFunction(byte[] acggfBytes) {
+        public DeployJobSpecFunction(byte[] acggfBytes) {
             this.acggfBytes = acggfBytes;
         }
 
@@ -145,13 +148,13 @@
         }
     }
 
-    public static class DestroyJobFunction extends Function {
+    public static class UndeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
 
-        public DestroyJobFunction(JobId jobId) {
-            this.jobId = jobId;
+        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+            this.deployedJobSpecId = deployedJobSpecId;
         }
 
         @Override
@@ -159,8 +162,8 @@
             return FunctionId.DESTROY_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
     }
 
@@ -168,27 +171,30 @@
         private static final long serialVersionUID = 1L;
 
         private final byte[] acggfBytes;
-        private final EnumSet<JobFlag> jobFlags;
+        private final Set<JobFlag> jobFlags;
         private final DeploymentId deploymentId;
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
+        private final Map<byte[], byte[]> jobParameters;
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) {
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags,
+                DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) {
             this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
             this.deploymentId = deploymentId;
-            this.jobId = jobId;
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.jobParameters = jobParameters;
         }
 
-        public StartJobFunction(JobId jobId) {
-            this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
+        public StartJobFunction(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) {
+            this(null, null, EnumSet.noneOf(JobFlag.class), deployedJobSpecId, jobParameters);
         }
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(null, acggfBytes, jobFlags, null);
+        public StartJobFunction(byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(null, acggfBytes, jobFlags, null, null);
         }
 
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this(deploymentId, acggfBytes, jobFlags, null);
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, Set<JobFlag> jobFlags) {
+            this(deploymentId, acggfBytes, jobFlags, null, null);
         }
 
         @Override
@@ -196,15 +202,19 @@
             return FunctionId.START_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
+        }
+
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
 
         public byte[] getACGGFBytes() {
             return acggfBytes;
         }
 
-        public EnumSet<JobFlag> getJobFlags() {
+        public Set<JobFlag> getJobFlags() {
             return jobFlags;
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0ded84f..eddcaa5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -26,6 +26,7 @@
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
@@ -76,9 +77,9 @@
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
+                new HyracksClientInterfaceFunctions.StartJobFunction(deployedJobSpecId, jobParameters);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -90,17 +91,17 @@
     }
 
     @Override
-    public JobId distributeJob(byte[] acggfBytes) throws Exception {
-        HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
-                new HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
-        return (JobId) rpci.call(ipcHandle, sjf);
+    public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception {
+        HyracksClientInterfaceFunctions.DeployJobSpecFunction sjf =
+                new HyracksClientInterfaceFunctions.DeployJobSpecFunction(acggfBytes);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
-                new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
-        return (JobId) rpci.call(ipcHandle, sjf);
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
+        HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf =
+                new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index e979da6..85ef927 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -109,20 +110,20 @@
     }
 
     @Override
-    public JobId distributeJob(JobSpecification jobSpec) throws Exception {
-        IActivityClusterGraphGeneratorFactory jsacggf =
+    public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
                 new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
-        return distributeJob(jsacggf);
+        return deployJobSpec(jsacggf);
     }
 
     @Override
-    public JobId destroyJob(JobId jobId) throws Exception {
-        return hci.destroyJob(jobId);
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
+        return hci.undeployJobSpec(deployedJobSpecId);
     }
 
     @Override
-    public JobId startJob(JobId jobId) throws Exception {
-        return hci.startJob(jobId);
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception {
+        return hci.startJob(deployedJobSpecId, jobParameters);
     }
 
     @Override
@@ -130,8 +131,8 @@
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
-    public JobId distributeJob(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
-        return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
+    public DeployedJobSpecId deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
+        return hci.deployJobSpec(JavaSerializationUtils.serialize(acggf));
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index a7c1d75..510a6b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -20,9 +20,11 @@
 
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -94,25 +96,27 @@
      *            Flags
      * @throws Exception
      */
-    JobId distributeJob(JobSpecification jobSpec) throws Exception;
+    DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
 
     /**
-     * Destroy the distributed graph for a pre-distributed job
+     * Remove the deployed Job Spec
      *
-     * @param jobId
-     *            The id of the predistributed job
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
      * @throws Exception
      */
-    JobId destroyJob(JobId jobId) throws Exception;
+    DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     /**
-     * Used to run a pre-distributed job by id (the same JobId will be returned)
+     * Used to run a deployed Job Spec by id
      *
-     * @param jobId
-     *            The id of the predistributed job
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
+     * @param jobParameters
+     *            The serialized job parameters
      * @throws Exception
      */
-    JobId startJob(JobId jobId) throws Exception;
+    JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
 
     /**
      * Start the specified Job.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 9cebd3e..f0c7872 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -25,6 +25,7 @@
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobInfo;
@@ -38,13 +39,13 @@
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 
-    public JobId startJob(JobId jobId) throws Exception;
+    public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception;
 
     public void cancelJob(JobId jobId) throws Exception;
 
-    public JobId distributeJob(byte[] acggfBytes) throws Exception;
+    public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
 
-    public JobId destroyJob(JobId jobId) throws Exception;
+    public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
index 7dd5fe9..0b2cc9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -28,7 +28,6 @@
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,7 +35,6 @@
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
 import org.apache.hyracks.api.job.JobActivityGraph;
-import org.apache.hyracks.api.job.JobId;
 
 public class ActivityClusterGraphBuilder {
     private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
@@ -70,7 +68,7 @@
         return null;
     }
 
-    public ActivityClusterGraph inferActivityClusters(JobId jobId, JobActivityGraph jag) {
+    public ActivityClusterGraph inferActivityClusters(JobActivityGraph jag) {
         /*
          * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
          */
@@ -99,7 +97,7 @@
         Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
         List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
         for (Set<ActivityId> stage : stages) {
-            ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(jobId, acCounter++));
+            ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(acCounter++));
             acList.add(ac);
             for (ActivityId aid : stage) {
                 IActivity activity = activityNodeMap.get(aid);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index c712b36..ddf0ce8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -32,7 +32,6 @@
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobActivityGraph;
 import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.rewriter.ActivityClusterGraphRewriter;
 
@@ -51,8 +50,8 @@
     }
 
     @Override
-    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
-            final ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException {
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(final ICCServiceContext ccServiceCtx,
+            Set<JobFlag> jobFlags) throws HyracksException {
         final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
         PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
             @Override
@@ -70,7 +69,7 @@
         final JobActivityGraph jag = builder.getActivityGraph();
         ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
 
-        final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+        final ActivityClusterGraph acg = acgb.inferActivityClusters(jag);
         acg.setFrameSize(spec.getFrameSize());
         acg.setMaxReattempts(spec.getMaxReattempts());
         acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 6eb0adb..1100335 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
@@ -34,6 +35,8 @@
 
     Object getGlobalJobData();
 
+    IJobletEventListenerFactory getJobletEventListenerFactory();
+
     Class<?> loadClass(String className) throws HyracksException;
 
     ClassLoader getClassLoader() throws HyracksException;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 10bb336..bf42d0c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
 import org.apache.hyracks.api.job.JobFlag;
@@ -52,6 +53,8 @@
 
     Object getSharedObject();
 
+    public byte[] getJobParameter(byte[] name, int start, int length) throws HyracksException;
+
     Set<JobFlag> getJobFlags();
 
     IStatsCollector getStatsCollector();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7926469..eaf9bbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -54,9 +54,9 @@
     public static final int INCONSISTENT_RESULT_METADATA = 18;
     public static final int CANNOT_DELETE_FILE = 19;
     public static final int NOT_A_JOBID = 20;
-    public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
-    public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
-    public static final int DISTRIBUTED_JOB_FAILURE = 23;
+    public static final int ERROR_FINDING_DEPLOYED_JOB = 21;
+    public static final int DUPLICATE_DEPLOYED_JOB = 22;
+    public static final int DEPLOYED_JOB_FAILURE = 23;
     public static final int NO_RESULT_SET = 24;
     public static final int JOB_CANCELED = 25;
     public static final int NODE_FAILED = 26;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
index e0c5279..84a754a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java
@@ -23,19 +23,12 @@
 public final class ActivityClusterId implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
-
     private final int id;
 
-    public ActivityClusterId(JobId jobId, int id) {
-        this.jobId = jobId;
+    public ActivityClusterId(int id) {
         this.id = id;
     }
 
-    public JobId getJobId() {
-        return jobId;
-    }
-
     public int getId() {
         return id;
     }
@@ -45,7 +38,6 @@
         final int prime = 31;
         int result = 1;
         result = prime * result + id;
-        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
         return result;
     }
 
@@ -64,18 +56,11 @@
         if (id != other.id) {
             return false;
         }
-        if (jobId == null) {
-            if (other.jobId != null) {
-                return false;
-            }
-        } else if (!jobId.equals(other.jobId)) {
-            return false;
-        }
         return true;
     }
 
     @Override
     public String toString() {
-        return "ACID:" + jobId + ":" + id;
+        return "ACID:" + id;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
new file mode 100644
index 0000000..8cbfb1a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecId.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IWritable;
+
+public final class DeployedJobSpecId implements IWritable, Serializable {
+
+    public static final DeployedJobSpecId INVALID = new DeployedJobSpecId(-1l);
+
+    private static final long serialVersionUID = 1L;
+    private long id;
+
+    public static DeployedJobSpecId create(DataInput dis) throws IOException {
+        DeployedJobSpecId deployedJobSpecId = new DeployedJobSpecId();
+        deployedJobSpecId.readFields(dis);
+        return deployedJobSpecId;
+    }
+
+    private DeployedJobSpecId() {
+    }
+
+    public DeployedJobSpecId(long id) {
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof DeployedJobSpecId)) {
+            return false;
+        }
+        return ((DeployedJobSpecId) o).id == id;
+    }
+
+    @Override
+    public String toString() {
+        return "PDJID:" + id;
+    }
+
+    public static DeployedJobSpecId parse(String str) throws HyracksDataException {
+        if (str.startsWith("PDJID:")) {
+            return new DeployedJobSpecId(Long.parseLong(str.substring(4)));
+        }
+        throw HyracksDataException.create(ErrorCode.NOT_A_JOBID, str);
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeLong(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readLong();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
new file mode 100644
index 0000000..24caa9b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class DeployedJobSpecIdFactory {
+    private final AtomicLong id = new AtomicLong(0);
+
+    public DeployedJobSpecId create() {
+        return new DeployedJobSpecId(id.getAndIncrement());
+    }
+
+    public long maxDeployedJobSpecId() {
+        return id.get();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 133e342..d23b944 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 
 public interface IActivityClusterGraphGeneratorFactory extends Serializable {
-    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(
             ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException;
 
     public JobSpecification getJobSpecification();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
index d523cccc..bd2f189 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java
@@ -23,5 +23,10 @@
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 
 public interface IJobletEventListenerFactory extends Serializable {
-    public IJobletEventListener createListener(IHyracksJobletContext ctx);
+    IJobletEventListener createListener(IHyracksJobletContext ctx);
+
+    IJobletEventListenerFactory copyFactory();
+
+    //Allows job parameters to change listener settings
+    void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
new file mode 100644
index 0000000..551b3d7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobParameterByteStore implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Map<byte[], byte[]> runtimeValues;
+    private final byte[] empty = new byte[0];
+
+    public JobParameterByteStore() {
+        runtimeValues = new HashMap<>();
+    }
+
+    public Map<byte[], byte[]> getParameterMap() {
+        return runtimeValues;
+    }
+
+    public void setParameters(Map<byte[], byte[]> map) {
+        runtimeValues = map;
+    }
+
+    public byte[] getParameterValue(byte[] name, int start, int length) {
+        for (Entry<byte[], byte[]> entry : runtimeValues.entrySet()) {
+            byte[] key = entry.getKey();
+            if (key.length == length) {
+                boolean matched = true;
+                for (int j = 0; j < length; j++) {
+                    if (key[j] != name[j + start]) {
+                        matched = false;
+                        break;
+                    }
+                }
+                if (matched) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return empty;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 327c422..4e3c0f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -24,15 +24,15 @@
 import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobInfo;
 import org.apache.hyracks.control.cc.work.CancelJobWork;
 import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
-import org.apache.hyracks.control.cc.work.DestroyJobWork;
-import org.apache.hyracks.control.cc.work.DistributeJobWork;
+import org.apache.hyracks.control.cc.work.DeployJobSpecWork;
 import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobInfoWork;
 import org.apache.hyracks.control.cc.work.GetJobStatusWork;
@@ -42,6 +42,7 @@
 import org.apache.hyracks.control.cc.work.GetResultStatusWork;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
 import org.apache.hyracks.control.cc.work.JobStartWork;
+import org.apache.hyracks.control.cc.work.UndeployJobSpecWork;
 import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
 import org.apache.hyracks.control.common.work.IPCResponder;
 import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -53,10 +54,12 @@
     private static final Logger LOGGER = Logger.getLogger(ClientInterfaceIPCI.class.getName());
     private final ClusterControllerService ccs;
     private final JobIdFactory jobIdFactory;
+    private final DeployedJobSpecIdFactory deployedJobSpecIdFactory;
 
-    ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory jobIdFactory) {
+    ClientInterfaceIPCI(final ClusterControllerService ccs, final JobIdFactory jobIdFactory) {
         this.ccs = ccs;
         this.jobIdFactory = jobIdFactory;
+        this.deployedJobSpecIdFactory = ccs.getDeployedJobSpecIdFactory();
     }
 
     @Override
@@ -83,16 +86,17 @@
                         new IPCResponder<JobInfo>(handle, mid)));
                 break;
             case DISTRIBUTE_JOB:
-                HyracksClientInterfaceFunctions.DistributeJobFunction djf =
-                        (HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
-                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory,
-                        new IPCResponder<JobId>(handle, mid)));
+                HyracksClientInterfaceFunctions.DeployJobSpecFunction djf =
+                        (HyracksClientInterfaceFunctions.DeployJobSpecFunction) fn;
+                ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(),
+                        deployedJobSpecIdFactory.create(), new IPCResponder<>(handle, mid)));
                 break;
             case DESTROY_JOB:
-                HyracksClientInterfaceFunctions.DestroyJobFunction dsjf =
-                        (HyracksClientInterfaceFunctions.DestroyJobFunction) fn;
+                HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf =
+                        (HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn;
                 ccs.getWorkQueue()
-                        .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid)));
+                        .schedule(new UndeployJobSpecWork(ccs, dsjf.getDeployedJobSpecId(),
+                                new IPCResponder<>(handle, mid)));
                 break;
             case CANCEL_JOB:
                 HyracksClientInterfaceFunctions.CancelJobFunction cjf =
@@ -103,8 +107,14 @@
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), sjf.getACGGFBytes(),
-                        sjf.getJobFlags(), sjf.getJobId(), new IPCResponder<JobId>(handle, mid), jobIdFactory));
+                DeployedJobSpecId id = sjf.getDeployedJobSpecId();
+                byte[] acggfBytes = null;
+                if (id == null) {
+                    //The job is new
+                    acggfBytes = sjf.getACGGFBytes();
+                }
+                ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
+                        jobIdFactory, sjf.getJobParameters(), new IPCResponder<>(handle, mid), id));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
                 ccs.getWorkQueue().schedule(
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index af5c102..5a53fce 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -23,7 +23,7 @@
 
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
-import org.apache.hyracks.control.cc.work.DistributedJobFailureWork;
+import org.apache.hyracks.control.cc.work.DeployedJobFailureWork;
 import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -76,13 +76,12 @@
                 break;
             case NOTIFY_JOBLET_CLEANUP:
                 CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
-                ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(),
-                        njcf.getNodeId()));
+                ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(), njcf.getNodeId()));
                 break;
             case NOTIFY_DEPLOY_BINARY:
                 CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
-                ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(),
-                        ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+                ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(), ndbf.getNodeId(),
+                        ndbf.getDeploymentStatus()));
                 break;
             case REPORT_PROFILE:
                 CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
@@ -90,49 +89,48 @@
                 break;
             case NOTIFY_TASK_COMPLETE:
                 CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
-                ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(),
-                        ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+                ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(), ntcf.getTaskId(),
+                        ntcf.getNodeId(), ntcf.getStatistics()));
                 break;
             case NOTIFY_TASK_FAILURE:
                 CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
-                ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(),
-                        ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
+                ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(), ntff.getTaskId(),
+                        ntff.getNodeId(), ntff.getExceptions()));
                 break;
-            case DISTRIBUTED_JOB_FAILURE:
-                CCNCFunctions.ReportDistributedJobFailureFunction rdjf =
-                        (CCNCFunctions.ReportDistributedJobFailureFunction) fn;
-                ccs.getWorkQueue().schedule(new DistributedJobFailureWork(rdjf.getJobId(), rdjf.getNodeId()));
+            case DEPLOYED_JOB_FAILURE:
+                CCNCFunctions.ReportDeployedJobSpecFailureFunction rdjf =
+                        (CCNCFunctions.ReportDeployedJobSpecFailureFunction) fn;
+                ccs.getWorkQueue()
+                        .schedule(new DeployedJobFailureWork(rdjf.getDeployedJobSpecId(), rdjf.getNodeId()));
                 break;
             case REGISTER_PARTITION_PROVIDER:
                 CCNCFunctions.RegisterPartitionProviderFunction rppf =
                         (CCNCFunctions.RegisterPartitionProviderFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs,
-                        rppf.getPartitionDescriptor()));
+                ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs, rppf.getPartitionDescriptor()));
                 break;
             case REGISTER_PARTITION_REQUEST:
                 CCNCFunctions.RegisterPartitionRequestFunction rprf =
                         (CCNCFunctions.RegisterPartitionRequestFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs,
-                        rprf.getPartitionRequest()));
+                ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs, rprf.getPartitionRequest()));
                 break;
             case REGISTER_RESULT_PARTITION_LOCATION:
                 CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
                         (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterResultPartitionLocationWork(ccs,
-                        rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
-                        rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+                ccs.getWorkQueue()
+                        .schedule(new RegisterResultPartitionLocationWork(ccs, rrplf.getJobId(), rrplf.getResultSetId(),
+                                rrplf.getOrderedResult(), rrplf.getEmptyResult(), rrplf.getPartition(),
+                                rrplf.getNPartitions(), rrplf.getNetworkAddress()));
                 break;
             case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
                 CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrpwc =
                         (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
-                ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs,
-                        rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition()));
+                ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(),
+                        rrpwc.getResultSetId(), rrpwc.getPartition()));
                 break;
             case SEND_APPLICATION_MESSAGE:
-                CCNCFunctions.SendApplicationMessageFunction rsf =
-                        (CCNCFunctions.SendApplicationMessageFunction) fn;
-                ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, rsf.getMessage(),
-                        rsf.getDeploymentId(), rsf.getNodeId()));
+                CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                ccs.getWorkQueue().schedule(
+                        new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId()));
                 break;
             case GET_NODE_CONTROLLERS_INFO:
                 ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
@@ -150,18 +148,17 @@
                 break;
             case STATE_DUMP_RESPONSE:
                 CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
-                ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, dsrf.getNodeId(),
-                        dsrf.getStateDumpId(), dsrf.getState()));
+                ccs.getWorkQueue().schedule(
+                        new NotifyStateDumpResponse(ccs, dsrf.getNodeId(), dsrf.getStateDumpId(), dsrf.getState()));
                 break;
             case SHUTDOWN_RESPONSE:
                 CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
                 ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, sdrf.getNodeId()));
                 break;
             case THREAD_DUMP_RESPONSE:
-                CCNCFunctions.ThreadDumpResponseFunction tdrf =
-                        (CCNCFunctions.ThreadDumpResponseFunction)fn;
-                ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs,
-                        tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
+                CCNCFunctions.ThreadDumpResponseFunction tdrf = (CCNCFunctions.ThreadDumpResponseFunction) fn;
+                ccs.getWorkQueue()
+                        .schedule(new NotifyThreadDumpResponse(ccs, tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
                 break;
             default:
                 LOGGER.warning("Unknown function: " + fn.getFunctionId());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 7b99df2..713bddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -50,7 +50,10 @@
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.DeployedJobSpecIdFactory;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -107,7 +110,9 @@
 
     private CCServiceContext serviceCtx;
 
-    private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore();
+    private final DeployedJobSpecStore deployedJobSpecStore = new DeployedJobSpecStore();
+
+    private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>();
 
     private final WorkQueue workQueue;
 
@@ -135,6 +140,8 @@
 
     private final JobIdFactory jobIdFactory;
 
+    private final DeployedJobSpecIdFactory deployedJobSpecIdFactory;
+
     private IJobManager jobManager;
 
     private ShutdownRun shutdownCallback;
@@ -164,8 +171,8 @@
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
-                ccConfig.getResultSweepThreshold(), preDistributedJobStore);
+        datasetDirectoryService =
+                new DatasetDirectoryService(ccConfig.getResultTTL(), ccConfig.getResultSweepThreshold());
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -175,6 +182,8 @@
         nodeManager = new NodeManager(this, ccConfig, resourceManager);
 
         jobIdFactory = new JobIdFactory();
+
+        deployedJobSpecIdFactory = new DeployedJobSpecIdFactory();
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -347,8 +356,21 @@
         return nodeManager;
     }
 
-    public PreDistributedJobStore getPreDistributedJobStore() throws HyracksException {
-        return preDistributedJobStore;
+    public DeployedJobSpecStore getDeployedJobSpecStore() throws HyracksException {
+        return deployedJobSpecStore;
+    }
+
+    public void removeJobParameterByteStore(JobId jobId) throws HyracksException {
+        jobParameterByteStoreMap.remove(jobId);
+    }
+
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException {
+        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+        if (jpbs == null) {
+            jpbs = new JobParameterByteStore();
+            jobParameterByteStoreMap.put(jobId, jpbs);
+        }
+        return jpbs;
     }
 
     public IResourceManager getResourceManager() {
@@ -397,6 +419,10 @@
         return jobIdFactory;
     }
 
+    public DeployedJobSpecIdFactory getDeployedJobSpecIdFactory() {
+        return deployedJobSpecIdFactory;
+    }
+
     private final class ClusterControllerContext implements ICCContext {
         private final ClusterTopology topology;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
new file mode 100644
index 0000000..1a3051e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class DeployedJobSpecStore {
+
+    private final Map<DeployedJobSpecId, DeployedJobSpecDescriptor> deployedJobSpecDescriptorMap;
+
+    public DeployedJobSpecStore() {
+        deployedJobSpecDescriptorMap = new Hashtable<>();
+    }
+
+    public void addDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId,
+            ActivityClusterGraph activityClusterGraph,
+            JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
+                    throws HyracksException {
+        if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        DeployedJobSpecDescriptor descriptor =
+                new DeployedJobSpecDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
+        deployedJobSpecDescriptorMap.put(deployedJobSpecId, descriptor);
+    }
+
+    public void checkForExistingDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+        if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
+        }
+    }
+
+    public DeployedJobSpecDescriptor getDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId)
+            throws HyracksException {
+        DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId);
+        if (descriptor == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        return descriptor;
+    }
+
+    public void removeDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+        DeployedJobSpecDescriptor descriptor = deployedJobSpecDescriptorMap.get(deployedJobSpecId);
+        if (descriptor == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        deployedJobSpecDescriptorMap.remove(deployedJobSpecId);
+    }
+
+    public class DeployedJobSpecDescriptor {
+
+        private final ActivityClusterGraph activityClusterGraph;
+
+        private final JobSpecification jobSpecification;
+
+        private final Set<Constraint> activityClusterGraphConstraints;
+
+        private DeployedJobSpecDescriptor(ActivityClusterGraph activityClusterGraph,
+                JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
+            this.activityClusterGraph = activityClusterGraph;
+            this.jobSpecification = jobSpecification;
+            this.activityClusterGraphConstraints = activityClusterGraphConstraints;
+        }
+
+        public ActivityClusterGraph getActivityClusterGraph() {
+            return activityClusterGraph;
+        }
+
+        public JobSpecification getJobSpecification() {
+            return jobSpecification;
+        }
+
+        public Set<Constraint> getActivityClusterGraphConstraints() {
+            return activityClusterGraphConstraints;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
deleted file mode 100644
index 117621f..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc;
-
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class PreDistributedJobStore {
-
-    private final Map<JobId, PreDistributedJobDescriptor> preDistributedJobDescriptorMap;
-
-    public PreDistributedJobStore() {
-        preDistributedJobDescriptorMap = new Hashtable<>();
-    }
-
-    public void addDistributedJobDescriptor(JobId jobId, ActivityClusterGraph activityClusterGraph,
-            JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints)
-                    throws HyracksException {
-        if (preDistributedJobDescriptorMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
-        }
-        PreDistributedJobDescriptor descriptor =
-                new PreDistributedJobDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
-        preDistributedJobDescriptorMap.put(jobId, descriptor);
-    }
-
-    public void checkForExistingDistributedJobDescriptor(JobId jobId) throws HyracksException {
-        if (preDistributedJobDescriptorMap.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
-        }
-    }
-
-    public PreDistributedJobDescriptor getDistributedJobDescriptor(JobId jobId) throws HyracksException {
-        PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
-        if (descriptor == null) {
-            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
-        }
-        return descriptor;
-    }
-
-    public boolean jobIsPredistributed(JobId jobId) {
-        return preDistributedJobDescriptorMap.get(jobId) != null;
-    }
-
-    public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException {
-        PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId);
-        if (descriptor == null) {
-            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
-        }
-        preDistributedJobDescriptorMap.remove(jobId);
-    }
-
-    public class PreDistributedJobDescriptor {
-
-        private final ActivityClusterGraph activityClusterGraph;
-
-        private final JobSpecification jobSpecification;
-
-        private final Set<Constraint> activityClusterGraphConstraints;
-
-        private PreDistributedJobDescriptor(ActivityClusterGraph activityClusterGraph,
-                JobSpecification jobSpecification, Set<Constraint> activityClusterGraphConstraints) {
-            this.activityClusterGraph = activityClusterGraph;
-            this.jobSpecification = jobSpecification;
-            this.activityClusterGraphConstraints = activityClusterGraphConstraints;
-        }
-
-        public ActivityClusterGraph getActivityClusterGraph() {
-            return activityClusterGraph;
-        }
-
-        public JobSpecification getJobSpecification() {
-            return jobSpecification;
-        }
-
-        public Set<Constraint> getActivityClusterGraphConstraints() {
-            return activityClusterGraphConstraints;
-        }
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ca1c91b..1cb07d0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -42,7 +42,6 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.control.cc.PreDistributedJobStore;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
@@ -63,14 +62,10 @@
 
     private final Map<JobId, JobResultInfo> jobResultLocations;
 
-    private final PreDistributedJobStore preDistributedJobStore;
-
-    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
-            PreDistributedJobStore preDistributedJobStore) {
+    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
-        this.preDistributedJobStore = preDistributedJobStore;
-        jobResultLocations = new LinkedHashMap<>();
+        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
     }
 
     @Override
@@ -186,9 +181,6 @@
 
     @Override
     public synchronized long getResultTimestamp(JobId jobId) {
-        if (preDistributedJobStore.jobIsPredistributed(jobId)) {
-            return -1;
-        }
         return getState(jobId).getTimestamp();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 8a69a6f..0b69024 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -77,7 +78,7 @@
 
     private final PartitionConstraintSolver solver;
 
-    private final boolean predistributed;
+    private final DeployedJobSpecId deployedJobSpecId;
 
     private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
 
@@ -88,10 +89,10 @@
     private boolean cancelled = false;
 
     public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints,
-            boolean predistributed) {
+            DeployedJobSpecId deployedJobSpecId) {
         this.ccs = ccs;
         this.jobRun = jobRun;
-        this.predistributed = predistributed;
+        this.deployedJobSpecId = deployedJobSpecId;
         solver = new PartitionConstraintSolver();
         partitionProducingTaskClusterMap = new HashMap<>();
         inProgressTaskClusters = new HashSet<>();
@@ -99,8 +100,8 @@
         random = new Random();
     }
 
-    public boolean isPredistributed() {
-        return predistributed;
+    public boolean isDeployed() {
+        return deployedJobSpecId != null;
     }
 
     public JobRun getJobRun() {
@@ -502,7 +503,7 @@
                 new HashMap<>(jobRun.getConnectorPolicyMap());
         INodeManager nodeManager = ccs.getNodeManager();
         try {
-            byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg);
+            byte[] acgBytes = isDeployed() ? null : JavaSerializationUtils.serialize(acg);
             for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
                 String nodeId = entry.getKey();
                 final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
@@ -515,7 +516,8 @@
                     }
                     byte[] jagBytes = changed ? acgBytes : null;
                     node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
-                            connectorPolicies, jobRun.getFlags());
+                            connectorPolicies, jobRun.getFlags(),
+                            ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId);
                 }
             }
         } catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index fa22dd3..26f8022 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -142,6 +142,7 @@
 
     @Override
     public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
+        ccs.removeJobParameterByteStore(run.getJobId());
         checkJob(run);
         if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
             run.setPendingStatus(JobStatus.FAILURE, exceptions);
@@ -306,9 +307,7 @@
 
         CCServiceContext serviceCtx = ccs.getContext();
         JobSpecification spec = run.getJobSpecification();
-        if (!run.getExecutor().isPredistributed()) {
-            serviceCtx.notifyJobCreation(jobId, spec);
-        }
+        serviceCtx.notifyJobCreation(jobId, spec);
         run.setStatus(JobStatus.RUNNING, null);
         executeJobInternal(run);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index ef0bca2..58f44ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.ActivityClusterId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
@@ -45,7 +46,7 @@
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.PreDistributedJobStore.PreDistributedJobDescriptor;
+import org.apache.hyracks.control.cc.DeployedJobSpecStore.DeployedJobSpecDescriptor;
 import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails;
 import org.apache.hyracks.control.cc.executor.JobExecutor;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
@@ -114,21 +115,23 @@
         createTime = System.currentTimeMillis();
     }
 
-    //Run a Pre-distributed job by passing the JobId
+    //Run a deployed job spec
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
-            PreDistributedJobDescriptor distributedJobDescriptor)
+            DeployedJobSpecDescriptor deployedJobSpecDescriptor, Map<byte[], byte[]> jobParameters,
+            DeployedJobSpecId deployedJobSpecId)
             throws HyracksException {
         this(deploymentId, jobId, jobFlags,
-                distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
-        Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
-        this.scheduler = new JobExecutor(ccs, this, constaints, true);
+                deployedJobSpecDescriptor.getJobSpecification(), deployedJobSpecDescriptor.getActivityClusterGraph());
+        ccs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
+        Set<Constraint> constaints = deployedJobSpecDescriptor.getActivityClusterGraphConstraints();
+        this.scheduler = new JobExecutor(ccs, this, constaints, deployedJobSpecId);
     }
 
     //Run a new job by creating an ActivityClusterGraph
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
             IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) {
         this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), acgg.initialize());
-        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false);
+        this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), null);
     }
 
     public DeploymentId getDeploymentId() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
similarity index 72%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
index 5a57b1b..f7335a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
@@ -21,11 +21,10 @@
 import java.util.EnumSet;
 
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
@@ -35,15 +34,15 @@
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
-public class DistributeJobWork extends SynchronizableWork {
+public class DeployJobSpecWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
-    private final JobIdFactory jobIdFactory;
-    private final IResultCallback<JobId> callback;
+    private final DeployedJobSpecId deployedJobSpecId;
+    private final IResultCallback<DeployedJobSpecId> callback;
 
-    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobIdFactory jobIdFactory,
-            IResultCallback<JobId> callback) {
-        this.jobIdFactory = jobIdFactory;
+    public DeployJobSpecWork(ClusterControllerService ccs, byte[] acggfBytes, DeployedJobSpecId deployedJobSpecId,
+            IResultCallback<DeployedJobSpecId> callback) {
+        this.deployedJobSpecId = deployedJobSpecId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.callback = callback;
@@ -52,27 +51,24 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            JobId jobId = jobIdFactory.create();
             final CCServiceContext ccServiceCtx = ccs.getContext();
-            ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
+            ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId);
             IActivityClusterGraphGeneratorFactory acggf =
                     (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
             IActivityClusterGraphGenerator acgg =
-                    acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, EnumSet.noneOf(JobFlag.class));
+                    acggf.createActivityClusterGraphGenerator(ccServiceCtx, EnumSet.noneOf(JobFlag.class));
             ActivityClusterGraph acg = acgg.initialize();
-            ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(),
+            ccs.getDeployedJobSpecStore().addDeployedJobSpecDescriptor(deployedJobSpecId, acg,
+                    acggf.getJobSpecification(),
                     acgg.getConstraints());
 
-            ccServiceCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
-
             byte[] acgBytes = JavaSerializationUtils.serialize(acg);
 
             INodeManager nodeManager = ccs.getNodeManager();
             for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
-                node.getNodeController().distributeJob(jobId, acgBytes);
+                node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes);
             }
-
-            callback.setValue(jobId);
+            callback.setValue(deployedJobSpecId);
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
similarity index 74%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
index f7fa2a4..8afdf42 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributedJobFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployedJobFailureWork.java
@@ -20,20 +20,20 @@
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
-public class DistributedJobFailureWork extends SynchronizableWork {
-    protected final JobId jobId;
+public class DeployedJobFailureWork extends SynchronizableWork {
+    protected final DeployedJobSpecId deployedJobSpecId;
     protected final String nodeId;
 
-    public DistributedJobFailureWork(JobId jobId, String nodeId) {
-        this.jobId = jobId;
+    public DeployedJobFailureWork(DeployedJobSpecId deployedJobSpecId, String nodeId) {
+        this.deployedJobSpecId = deployedJobSpecId;
         this.nodeId = nodeId;
     }
 
     @Override
     public void doRun() throws HyracksException {
-        throw HyracksException.create(ErrorCode.DISTRIBUTED_JOB_FAILURE, jobId, nodeId);
+        throw HyracksException.create(ErrorCode.DEPLOYED_JOB_FAILURE, deployedJobSpecId, nodeId);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index ed82705..cfedfc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
@@ -37,20 +39,23 @@
 public class JobStartWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
-    private final EnumSet<JobFlag> jobFlags;
+    private final Set<JobFlag> jobFlags;
     private final DeploymentId deploymentId;
-    private final JobId preDistributedJobId;
     private final IResultCallback<JobId> callback;
     private final JobIdFactory jobIdFactory;
+    private final DeployedJobSpecId deployedJobSpecId;
+    private final Map<byte[], byte[]> jobParameters;
 
     public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
-            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, JobIdFactory jobIdFactory) {
+            Set<JobFlag> jobFlags, JobIdFactory jobIdFactory, Map<byte[], byte[]> jobParameters,
+            IResultCallback<JobId> callback, DeployedJobSpecId deployedJobSpecId) {
         this.deploymentId = deploymentId;
-        this.preDistributedJobId = jobId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.jobFlags = jobFlags;
         this.callback = callback;
+        this.deployedJobSpecId = deployedJobSpecId;
+        this.jobParameters = jobParameters;
         this.jobIdFactory = jobIdFactory;
     }
 
@@ -61,19 +66,18 @@
             final CCServiceContext ccServiceCtx = ccs.getContext();
             JobId jobId;
             JobRun run;
-            if (preDistributedJobId == null) {
-                jobId = jobIdFactory.create();
+            jobId = jobIdFactory.create();
+            if (deployedJobSpecId == null) {
                 //Need to create the ActivityClusterGraph
                 IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
                         .deserialize(acggfBytes, deploymentId, ccServiceCtx);
-                IActivityClusterGraphGenerator acgg =
-                        acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags);
+                IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(ccServiceCtx, jobFlags);
                 run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
             } else {
-                jobId = preDistributedJobId;
                 //ActivityClusterGraph has already been distributed
                 run = new JobRun(ccs, deploymentId, jobId, jobFlags,
-                        ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
+                        ccs.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(deployedJobSpecId), jobParameters,
+                        deployedJobSpecId);
             }
             jobManager.add(run);
             callback.setValue(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
similarity index 70%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
index df98252..143c8c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DestroyJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
@@ -18,20 +18,21 @@
  */
 package org.apache.hyracks.control.cc.work;
 
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
-public class DestroyJobWork extends SynchronizableWork {
+public class UndeployJobSpecWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
-    private final JobId jobId;
-    private final IResultCallback<JobId> callback;
+    private final DeployedJobSpecId deployedJobSpecId;
+    private final IResultCallback<DeployedJobSpecId> callback;
 
-    public DestroyJobWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobId> callback) {
-        this.jobId = jobId;
+    public UndeployJobSpecWork(ClusterControllerService ccs, DeployedJobSpecId deployedJobSpecId,
+            IResultCallback<DeployedJobSpecId> callback) {
+        this.deployedJobSpecId = deployedJobSpecId;
         this.ccs = ccs;
         this.callback = callback;
     }
@@ -39,12 +40,12 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            ccs.getPreDistributedJobStore().removeDistributedJobDescriptor(jobId);
+            ccs.getDeployedJobSpecStore().removeDeployedJobSpecDescriptor(deployedJobSpecId);
             INodeManager nodeManager = ccs.getNodeManager();
             for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
-                node.getNodeController().destroyJob(jobId);
+                node.getNodeController().undeployJobSpec(deployedJobSpecId);
             }
-            callback.setValue(jobId);
+            callback.setValue(deployedJobSpecId);
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index ec8e045..6fd321e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -44,7 +45,7 @@
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
             throws Exception;
 
-    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception;
+    public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
 
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index a10f8f0..5d781cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.control.common.base;
 
 import java.net.URL;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +28,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
@@ -38,7 +38,8 @@
 public interface INodeController {
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception;
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+            throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
@@ -50,9 +51,9 @@
 
     public void undeployBinary(DeploymentId deploymentId) throws Exception;
 
-    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception;
+    public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
 
-    public void destroyJob(JobId jobId) throws Exception;
+    public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     public void dumpState(String stateDumpId) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4797ed7..77c352e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -49,6 +49,7 @@
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
@@ -102,7 +103,7 @@
 
         DISTRIBUTE_JOB,
         DESTROY_JOB,
-        DISTRIBUTED_JOB_FAILURE,
+        DEPLOYED_JOB_FAILURE,
 
         STATE_DUMP_REQUEST,
         STATE_DUMP_RESPONSE,
@@ -286,24 +287,24 @@
         }
     }
 
-    public static class ReportDistributedJobFailureFunction extends Function {
+    public static class ReportDeployedJobSpecFailureFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
         private final String nodeId;
 
-        public ReportDistributedJobFailureFunction(JobId jobId, String nodeId) {
-            this.jobId = jobId;
+        public ReportDeployedJobSpecFailureFunction(DeployedJobSpecId deployedJobSpecId, String nodeId) {
+            this.deployedJobSpecId = deployedJobSpecId;
             this.nodeId = nodeId;
         }
 
         @Override
         public FunctionId getFunctionId() {
-            return FunctionId.DISTRIBUTED_JOB_FAILURE;
+            return FunctionId.DEPLOYED_JOB_FAILURE;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
 
         public String getNodeId() {
@@ -676,15 +677,15 @@
         }
     }
 
-    public static class DistributeJobFunction extends Function {
+    public static class DeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
 
         private final byte[] acgBytes;
 
-        public DistributeJobFunction(JobId jobId, byte[] acgBytes) {
-            this.jobId = jobId;
+        public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) {
+            this.deployedJobSpecId = deployedJobSpecId;
             this.acgBytes = acgBytes;
         }
 
@@ -693,8 +694,8 @@
             return FunctionId.DISTRIBUTE_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
 
         public byte[] getacgBytes() {
@@ -702,13 +703,13 @@
         }
     }
 
-    public static class DestroyJobFunction extends Function {
+    public static class UndeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
+        private final DeployedJobSpecId deployedJobSpecId;
 
-        public DestroyJobFunction(JobId jobId) {
-            this.jobId = jobId;
+        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+            this.deployedJobSpecId = deployedJobSpecId;
         }
 
         @Override
@@ -716,8 +717,8 @@
             return FunctionId.DESTROY_JOB;
         }
 
-        public JobId getJobId() {
-            return jobId;
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
         }
     }
 
@@ -730,16 +731,21 @@
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
         private final Set<JobFlag> flags;
+        private final Map<byte[], byte[]> jobParameters;
+        private final DeployedJobSpecId deployedJobSpecId;
 
         public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
                 List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags) {
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags,
+                Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) {
             this.deploymentId = deploymentId;
             this.jobId = jobId;
             this.planBytes = planBytes;
             this.taskDescriptors = taskDescriptors;
             this.connectorPolicies = connectorPolicies;
             this.flags = flags;
+            this.jobParameters = jobParameters;
+            this.deployedJobSpecId = deployedJobSpecId;
         }
 
         @Override
@@ -751,10 +757,18 @@
             return deploymentId;
         }
 
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
+        }
+
         public JobId getJobId() {
             return jobId;
         }
 
+        public Map<byte[], byte[]> getJobParameters() {
+            return jobParameters;
+        }
+
         public byte[] getPlanBytes() {
             return planBytes;
         }
@@ -815,7 +829,33 @@
                 flags.add(JobFlag.values()[(dis.readInt())]);
             }
 
-            return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags);
+            // read job parameters
+            int paramListSize = dis.readInt();
+            Map<byte[], byte[]> jobParameters = new HashMap<>();
+            for (int i = 0; i < paramListSize; i++) {
+                int nameLength = dis.readInt();
+                byte[] nameBytes = null;
+                if (nameLength >= 0) {
+                    nameBytes = new byte[nameLength];
+                    dis.read(nameBytes, 0, nameLength);
+                }
+                int valueLength = dis.readInt();
+                byte[] valueBytes = null;
+                if (valueLength >= 0) {
+                    valueBytes = new byte[valueLength];
+                    dis.read(valueBytes, 0, valueLength);
+                }
+                jobParameters.put(nameBytes, valueBytes);
+            }
+
+            //read DeployedJobSpecId
+            DeployedJobSpecId deployedJobSpecId = null;
+            if (dis.readBoolean()) {
+                deployedJobSpecId = DeployedJobSpecId.create(dis);
+            }
+
+            return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags,
+                    jobParameters, deployedJobSpecId);
         }
 
         public static void serialize(OutputStream out, Object object) throws Exception {
@@ -853,6 +893,22 @@
             for (JobFlag flag : fn.flags) {
                 dos.writeInt(flag.ordinal());
             }
+
+            //write job parameters
+            dos.writeInt(fn.jobParameters.size());
+            for (Entry<byte[], byte[]> entry : fn.jobParameters.entrySet()) {
+                dos.writeInt(entry.getKey().length);
+                dos.write(entry.getKey(), 0, entry.getKey().length);
+                dos.writeInt(entry.getValue().length);
+                dos.write(entry.getValue(), 0, entry.getValue().length);
+            }
+
+            //write deployed job spec id
+            dos.writeBoolean(fn.getDeployedJobSpecId() == null ? false : true);
+            if (fn.getDeployedJobSpecId() != null) {
+                fn.getDeployedJobSpecId().writeFields(dos);
+            }
+
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index c09f317..f9af4c63 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.logging.Logger;
@@ -28,11 +26,30 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.deployment.DeploymentStatus;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.GetNodeControllersInfoFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.NodeHeartbeatFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyDeployBinaryFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyJobletCleanupFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskCompleteFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.NotifyTaskFailureFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterNodeFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionProviderFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterPartitionRequestFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDeployedJobSpecFailureFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpResponseFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.UnregisterNodeFunction;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
@@ -151,9 +168,8 @@
     }
 
     @Override
-    public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception {
-        ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction(
-                jobId, nodeId);
+    public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception {
+        ReportDeployedJobSpecFailureFunction fn = new ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId);
         ensureIpcHandle().send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 3819ef8..8431eca 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
-
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
@@ -32,11 +30,24 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.base.INodeController;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortTasksFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.CleanupJobletFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployBinaryFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployJobSpecFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportPartitionAvailabilityFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownRequestFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StartTasksFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpRequestFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
@@ -61,9 +72,10 @@
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags) throws Exception {
+            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+            throws Exception {
         StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes,
-                taskDescriptors, connectorPolicies, flags);
+                taskDescriptors, connectorPolicies, flags, jobParameters, deployedJobSpecId);
         ensureIpcHandle().send(-1, stf, null);
     }
 
@@ -99,14 +111,14 @@
     }
 
     @Override
-    public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
-        DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes);
+    public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
+        DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes);
         ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
-    public void destroyJob(JobId jobId) throws Exception {
-        DestroyJobFunction fn = new DestroyJobFunction(jobId);
+    public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
+        UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId);
         ensureIpcHandle().send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 66a5e0f..ce666b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -100,8 +100,11 @@
 
     private boolean cleanupPending;
 
+    private final IJobletEventListenerFactory jobletEventListenerFactory;
+
     public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
-            INCServiceContext serviceCtx, ActivityClusterGraph acg) {
+            INCServiceContext serviceCtx, ActivityClusterGraph acg,
+            IJobletEventListenerFactory jobletEventListenerFactory) {
         this.nodeController = nodeController;
         this.serviceCtx = serviceCtx;
         this.deploymentId = deploymentId;
@@ -117,9 +120,9 @@
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(this, serviceCtx.getIoManager());
         cleanupPending = false;
-        IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
-        if (jelf != null) {
-            IJobletEventListener listener = jelf.createListener(this);
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+        if (jobletEventListenerFactory != null) {
+            IJobletEventListener listener = jobletEventListenerFactory.createListener(this);
             this.jobletEventListener = listener;
             listener.jobletStart();
         } else {
@@ -134,6 +137,11 @@
         return jobId;
     }
 
+    @Override
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
     public ActivityClusterGraph getActivityClusterGraph() {
         return acg;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 1eb1393..c54f153 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -27,12 +27,12 @@
 import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
-import org.apache.hyracks.control.nc.work.DestroyJobWork;
-import org.apache.hyracks.control.nc.work.DistributeJobWork;
+import org.apache.hyracks.control.nc.work.DeployJobSpecWork;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
 import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
+import org.apache.hyracks.control.nc.work.UndeployJobSpecWork;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
 
@@ -62,8 +62,10 @@
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
-                ncs.getWorkQueue().schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(),
-                        stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+                ncs.getWorkQueue()
+                        .schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(),
+                                stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags(),
+                                stf.getJobParameters(), stf.getDeployedJobSpecId()));
                 return;
             case ABORT_TASKS:
                 CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
@@ -104,13 +106,13 @@
                 return;
 
             case DISTRIBUTE_JOB:
-                CCNCFunctions.DistributeJobFunction djf = (CCNCFunctions.DistributeJobFunction) fn;
-                ncs.getWorkQueue().schedule(new DistributeJobWork(ncs, djf.getJobId(), djf.getacgBytes()));
+                CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn;
+                ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes()));
                 return;
 
             case DESTROY_JOB:
-                CCNCFunctions.DestroyJobFunction dsjf = (CCNCFunctions.DestroyJobFunction) fn;
-                ncs.getWorkQueue().schedule(new DestroyJobWork(ncs, dsjf.getJobId()));
+                CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn;
+                ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId()));
                 return;
 
             case STATE_DUMP_REQUEST:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 7a01b87..9dd9536 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -29,6 +29,7 @@
 import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -53,7 +54,9 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
@@ -128,7 +131,9 @@
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final Map<JobId, ActivityClusterGraph> preDistributedJobs;
+    private final Map<Long, ActivityClusterGraph> deployedJobSpecActivityClusterGraphMap;
+
+    private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>();
 
     private ExecutorService executor;
 
@@ -202,7 +207,7 @@
 
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
-        preDistributedJobs = new Hashtable<>();
+        deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -423,28 +428,43 @@
         return jobletMap;
     }
 
-    public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
-        if (preDistributedJobs.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
-        }
-        preDistributedJobs.put(jobId, acg);
+    public void removeJobParameterByteStore(JobId jobId) {
+        jobParameterByteStoreMap.remove(jobId);
     }
 
-    public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
-        if (preDistributedJobs.get(jobId) == null) {
-            throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException {
+        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
+        if (jpbs == null) {
+            jpbs = new JobParameterByteStore();
+            jobParameterByteStoreMap.put(jobId, jpbs);
         }
-        preDistributedJobs.remove(jobId);
+        return jpbs;
     }
 
-    public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
-        if (preDistributedJobs.get(jobId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
+
+    public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg)
+            throws HyracksException {
+        if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        deployedJobSpecActivityClusterGraphMap.put(deployedJobSpecId.getId(), acg);
+    }
+
+    public void removeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+        if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) == null) {
+            throw HyracksException.create(ErrorCode.ERROR_FINDING_DEPLOYED_JOB, deployedJobSpecId);
+        }
+        deployedJobSpecActivityClusterGraphMap.remove(deployedJobSpecId.getId());
+    }
+
+    public void checkForDuplicateDeployedJobSpec(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+        if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) {
+            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
         }
     }
 
-    public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
-        return preDistributedJobs.get(jobId);
+    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+        return deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId());
     }
 
     public NetworkManager getNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index fcd4bde..94ee92b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -455,6 +455,9 @@
     }
 
     @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) throws HyracksException {
+        return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, start, length);
+    }
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 670ce06..03ae90c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -51,6 +51,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
+        ncs.removeJobParameterByteStore(jobId);
         final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
         ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
         ncs.getExecutor().execute(new Runnable() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
similarity index 75%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
index 486a420..4276b67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
@@ -21,7 +21,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -30,29 +30,28 @@
  * pre-distribute a job that can be executed later
  *
  */
-public class DistributeJobWork extends AbstractWork {
+public class DeployJobSpecWork extends AbstractWork {
 
     private final NodeControllerService ncs;
     private final byte[] acgBytes;
-    private final JobId jobId;
+    private final DeployedJobSpecId deployedJobSpecId;
 
-    public DistributeJobWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes) {
+    public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) {
         this.ncs = ncs;
-        this.jobId = jobId;
+        this.deployedJobSpecId = deployedJobSpecId;
         this.acgBytes = acgBytes;
     }
 
     @Override
     public void run() {
         try {
-            ncs.checkForDuplicateDistributedJob(jobId);
-            ncs.updateMaxJobId(jobId);
+            ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId);
             ActivityClusterGraph acg =
                     (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
-            ncs.storeActivityClusterGraph(jobId, acg);
+            ncs.storeActivityClusterGraph(deployedJobSpecId, acg);
         } catch (HyracksException e) {
             try {
-                ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId());
+                ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId());
             } catch (Exception e1) {
                 e1.printStackTrace();
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index c369781..a2fcc25 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -47,11 +47,12 @@
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.deployment.DeploymentId;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -79,6 +80,8 @@
 
     private final JobId jobId;
 
+    private final DeployedJobSpecId deployedJobSpecId;
+
     private final byte[] acgBytes;
 
     private final List<TaskAttemptDescriptor> taskDescriptors;
@@ -87,16 +90,21 @@
 
     private final Set<JobFlag> flags;
 
+    private final Map<byte[], byte[]> jobParameters;
+
     public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags) {
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags,
+            Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) {
         this.ncs = ncs;
         this.deploymentId = deploymentId;
         this.jobId = jobId;
+        this.deployedJobSpecId = deployedJobSpecId;
         this.acgBytes = acgBytes;
         this.taskDescriptors = taskDescriptors;
         this.connectorPoliciesMap = connectorPoliciesMap;
         this.flags = flags;
+        this.jobParameters = jobParameters;
     }
 
     @Override
@@ -106,7 +114,7 @@
         try {
             ncs.updateMaxJobId(jobId);
             NCServiceContext serviceCtx = ncs.getContext();
-            Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, serviceCtx, acgBytes);
+            Joblet joblet = getOrCreateLocalJoblet(deploymentId, serviceCtx, acgBytes);
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
                 @Override
@@ -190,19 +198,22 @@
         }
     }
 
-    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCServiceContext appCtx,
-            byte[] acgBytes) throws HyracksException {
+    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, INCServiceContext appCtx, byte[] acgBytes)
+            throws HyracksException {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            ActivityClusterGraph acg = ncs.getActivityClusterGraph(jobId);
-            if (acg == null) {
-                if (acgBytes == null) {
-                    throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
+            ActivityClusterGraph acg = (deployedJobSpecId != null) ? ncs.getActivityClusterGraph(deployedJobSpecId)
+                    : (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
+            ncs.createOrGetJobParameterByteStore(jobId).setParameters(jobParameters);
+            IJobletEventListenerFactory listenerFactory = acg.getJobletEventListenerFactory();
+            if (listenerFactory != null) {
+                if (deployedJobSpecId != null) {
+                    listenerFactory = acg.getJobletEventListenerFactory().copyFactory();
                 }
-                acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
+                listenerFactory.updateListenerJobParameters(ncs.createOrGetJobParameterByteStore(jobId));
             }
-            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
+            ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory);
             jobletMap.put(jobId, ji);
         }
         return ji;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
similarity index 72%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
index 55dd01e..4383ff6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
@@ -20,31 +20,31 @@
 package org.apache.hyracks.control.nc.work;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 /**
- * destroy a pre-distributed job
+ * remove the deployed job
  *
  */
-public class DestroyJobWork extends AbstractWork {
+public class UndeployJobSpecWork extends AbstractWork {
 
     private final NodeControllerService ncs;
-    private final JobId jobId;
+    private final DeployedJobSpecId deployedJobSpecId;
 
-    public DestroyJobWork(NodeControllerService ncs, JobId jobId) {
+    public UndeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId) {
         this.ncs = ncs;
-        this.jobId = jobId;
+        this.deployedJobSpecId = deployedJobSpecId;
     }
 
     @Override
     public void run() {
         try {
-            ncs.removeActivityClusterGraph(jobId);
+            ncs.removeActivityClusterGraph(deployedJobSpecId);
         } catch (HyracksException e) {
             try {
-                ncs.getClusterController().notifyDistributedJobFailure(jobId, ncs.getId());
+                ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId());
             } catch (Exception e1) {
                 e1.printStackTrace();
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
new file mode 100644
index 0000000..dd4fdd1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.cluster.NodeManager;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class DeployedJobSpecsTest {
+    private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecsTest.class.getName());
+
+    private static final String NC1_ID = "nc1";
+    private static final String NC2_ID = "nc2";
+    private static final int TIME_THRESHOLD = 5000;
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    @BeforeClass
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.setClientListenAddress("127.0.0.1");
+        ccConfig.setClientListenPort(39000);
+        ccConfig.setClusterListenAddress("127.0.0.1");
+        ccConfig.setClusterListenPort(39001);
+        ccConfig.setProfileDumpPeriod(10000);
+        FileUtils.deleteQuietly(new File(joinPath("target", "data")));
+        FileUtils.copyDirectory(new File("data"), new File(joinPath("target", "data")));
+        File outDir = new File("target" + File.separator + "ClusterController");
+        outDir.mkdirs();
+        File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
+        ccRoot.delete();
+        ccRoot.mkdir();
+        ccConfig.setRootDir(ccRoot.getAbsolutePath());
+        ClusterControllerService ccBase = new ClusterControllerService(ccConfig);
+        // The spying below is dangerous since it replaces the ClusterControllerService already referenced by many
+        // objects created in the constructor above
+        cc = Mockito.spy(ccBase);
+        cc.start();
+
+        // The following code partially fixes the problem created by the spying
+        INodeManager nodeManager = cc.getNodeManager();
+        Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs");
+        ccsInNodeManager.setAccessible(true);
+        ccsInNodeManager.set(nodeManager, cc);
+
+        NCConfig ncConfig1 = new NCConfig(NC1_ID);
+        ncConfig1.setClusterAddress("localhost");
+        ncConfig1.setClusterPort(39001);
+        ncConfig1.setClusterListenAddress("127.0.0.1");
+        ncConfig1.setDataListenAddress("127.0.0.1");
+        ncConfig1.setResultListenAddress("127.0.0.1");
+        ncConfig1.setResultSweepThreshold(TIME_THRESHOLD);
+        ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+        NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
+        nc1 = Mockito.spy(nc1Base);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig(NC2_ID);
+        ncConfig2.setClusterAddress("localhost");
+        ncConfig2.setClusterPort(39001);
+        ncConfig2.setClusterListenAddress("127.0.0.1");
+        ncConfig2.setDataListenAddress("127.0.0.1");
+        ncConfig2.setResultListenAddress("127.0.0.1");
+        ncConfig2.setResultSweepThreshold(TIME_THRESHOLD);
+        ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+        NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
+        nc2 = Mockito.spy(nc2Base);
+        nc2.start();
+
+        hcc = new HyracksConnection(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void DistributedTest() throws Exception {
+        JobSpecification spec1 = UnionTest.createUnionJobSpec();
+        JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
+
+        //distribute both jobs
+        DeployedJobSpecId distributedId1 = hcc.deployJobSpec(spec1);
+        DeployedJobSpecId distributedId2 = hcc.deployJobSpec(spec2);
+
+        //make sure it finished
+        //cc will get the store once to check for duplicate insertion and once to insert per job
+        verify(cc, Mockito.timeout(TIME_THRESHOLD).times(4)).getDeployedJobSpecStore();
+        verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).storeActivityClusterGraph(any(), any());
+        verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).storeActivityClusterGraph(any(), any());
+        verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).checkForDuplicateDeployedJobSpec(any());
+        verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).checkForDuplicateDeployedJobSpec(any());
+
+        //confirm that both jobs are distributed
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) != null && nc2.getActivityClusterGraph(distributedId1) != null);
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) != null && nc2.getActivityClusterGraph(distributedId2) != null);
+        Assert.assertTrue(cc.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(distributedId1) != null);
+        Assert.assertTrue(cc.getDeployedJobSpecStore().getDeployedJobSpecDescriptor(distributedId2) != null);
+
+        //run the first job
+        JobId jobRunId1 = hcc.startJob(distributedId1, new HashMap<>());
+        hcc.waitForCompletion(jobRunId1);
+
+        //Make sure the job parameter map was removed
+        verify(cc, Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any());
+        verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any());
+        verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(1)).removeJobParameterByteStore(any());
+
+        //destroy the first job
+        hcc.undeployJobSpec(distributedId1);
+
+        //make sure it finished
+        verify(cc, Mockito.timeout(TIME_THRESHOLD).times(8)).getDeployedJobSpecStore();
+        verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(1)).removeActivityClusterGraph(any());
+        verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(1)).removeActivityClusterGraph(any());
+
+        //confirm the first job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId1) == null && nc2.getActivityClusterGraph(distributedId1) == null);
+        cc.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(distributedId1);
+
+        //run the second job
+        JobId jobRunId2 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId2);
+
+        //Make sure the job parameter map was removed
+        verify(cc, Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any());
+        verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any());
+        verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).removeJobParameterByteStore(any());
+
+        //run the second job again
+        JobId jobRunId3 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId3);
+
+        //Make sure the job parameter map was removed
+        verify(cc, Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any());
+        verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any());
+        verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(3)).removeJobParameterByteStore(any());
+
+        //destroy the second job
+        hcc.undeployJobSpec(distributedId2);
+
+        //make sure it finished
+        verify(cc, Mockito.timeout(TIME_THRESHOLD).times(12)).getDeployedJobSpecStore();
+        verify(nc1, Mockito.timeout(TIME_THRESHOLD).times(2)).removeActivityClusterGraph(any());
+        verify(nc2, Mockito.timeout(TIME_THRESHOLD).times(2)).removeActivityClusterGraph(any());
+
+        //confirm the second job is destroyed
+        Assert.assertTrue(nc1.getActivityClusterGraph(distributedId2) == null && nc2.getActivityClusterGraph(distributedId2) == null);
+        cc.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(distributedId2);
+
+        //run the second job 100 times in parallel
+        distributedId2 = hcc.deployJobSpec(spec2);
+        for (int i = 0; i < 100; i++) {
+            hcc.startJob(distributedId2, new HashMap<>());
+        }
+    }
+
+    @AfterClass
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
deleted file mode 100644
index caba5f6..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import static org.apache.hyracks.util.file.FileUtil.joinPath;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.cc.cluster.NodeManager;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class PredistributedJobsTest {
-    private static final Logger LOGGER = Logger.getLogger(PredistributedJobsTest.class.getName());
-
-    private static final String NC1_ID = "nc1";
-    private static final String NC2_ID = "nc2";
-
-    private static ClusterControllerService cc;
-    private static NodeControllerService nc1;
-    private static NodeControllerService nc2;
-    private static IHyracksClientConnection hcc;
-
-    @BeforeClass
-    public static void init() throws Exception {
-        CCConfig ccConfig = new CCConfig();
-        ccConfig.setClientListenAddress("127.0.0.1");
-        ccConfig.setClientListenPort(39000);
-        ccConfig.setClusterListenAddress("127.0.0.1");
-        ccConfig.setClusterListenPort(39001);
-        ccConfig.setProfileDumpPeriod(10000);
-        FileUtils.deleteQuietly(new File(joinPath("target", "data")));
-        FileUtils.copyDirectory(new File("data"), new File(joinPath("target", "data")));
-        File outDir = new File("target" + File.separator + "ClusterController");
-        outDir.mkdirs();
-        File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
-        ccRoot.delete();
-        ccRoot.mkdir();
-        ccConfig.setRootDir(ccRoot.getAbsolutePath());
-        ClusterControllerService ccBase = new ClusterControllerService(ccConfig);
-        // The spying below is dangerous since it replaces the ClusterControllerService already referenced by many
-        // objects created in the constructor above
-        cc = Mockito.spy(ccBase);
-        cc.start();
-
-        // The following code partially fixes the problem created by the spying
-        INodeManager nodeManager = cc.getNodeManager();
-        Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs");
-        ccsInNodeManager.setAccessible(true);
-        ccsInNodeManager.set(nodeManager, cc);
-
-        NCConfig ncConfig1 = new NCConfig(NC1_ID);
-        ncConfig1.setClusterAddress("localhost");
-        ncConfig1.setClusterPort(39001);
-        ncConfig1.setClusterListenAddress("127.0.0.1");
-        ncConfig1.setDataListenAddress("127.0.0.1");
-        ncConfig1.setResultListenAddress("127.0.0.1");
-        ncConfig1.setResultSweepThreshold(5000);
-        ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
-        NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
-        nc1 = Mockito.spy(nc1Base);
-        nc1.start();
-
-        NCConfig ncConfig2 = new NCConfig(NC2_ID);
-        ncConfig2.setClusterAddress("localhost");
-        ncConfig2.setClusterPort(39001);
-        ncConfig2.setClusterListenAddress("127.0.0.1");
-        ncConfig2.setDataListenAddress("127.0.0.1");
-        ncConfig2.setResultListenAddress("127.0.0.1");
-        ncConfig2.setResultSweepThreshold(5000);
-        ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
-        NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
-        nc2 = Mockito.spy(nc2Base);
-        nc2.start();
-
-        hcc = new HyracksConnection(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
-        }
-    }
-
-    @Test
-    public void DistributedTest() throws Exception {
-        JobSpecification spec1 = UnionTest.createUnionJobSpec();
-        JobSpecification spec2 = HeapSortMergeTest.createSortMergeJobSpec();
-
-        //distribute both jobs
-        JobId jobId1 = hcc.distributeJob(spec1);
-        JobId jobId2 = hcc.distributeJob(spec2);
-
-        //make sure it finished
-        //cc will get the store once to check for duplicate insertion and once to insert per job
-        verify(cc, Mockito.timeout(5000).times(4)).getPreDistributedJobStore();
-        verify(nc1, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
-        verify(nc2, Mockito.timeout(5000).times(2)).storeActivityClusterGraph(any(), any());
-        verify(nc1, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
-        verify(nc2, Mockito.timeout(5000).times(2)).checkForDuplicateDistributedJob(any());
-
-        //confirm that both jobs are distributed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) != null && nc2.getActivityClusterGraph(jobId1) != null);
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) != null && nc2.getActivityClusterGraph(jobId2) != null);
-        Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId1) != null);
-        Assert.assertTrue(cc.getPreDistributedJobStore().getDistributedJobDescriptor(jobId2) != null);
-
-        //run the first job
-        hcc.startJob(jobId1);
-        hcc.waitForCompletion(jobId1);
-
-        //destroy the first job
-        hcc.destroyJob(jobId1);
-
-        //make sure it finished
-        verify(cc, Mockito.timeout(5000).times(8)).getPreDistributedJobStore();
-        verify(nc1, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
-        verify(nc2, Mockito.timeout(5000).times(1)).removeActivityClusterGraph(any());
-
-        //confirm the first job is destroyed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId1) == null && nc2.getActivityClusterGraph(jobId1) == null);
-        cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId1);
-
-        //run the second job
-        hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
-
-        //wait ten seconds to ensure the result sweeper does not break the job
-        //The result sweeper runs every 5 seconds during the tests
-        Thread.sleep(10000);
-
-        //run the second job again
-        hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
-
-        //destroy the second job
-        hcc.destroyJob(jobId2);
-
-        //make sure it finished
-        verify(cc, Mockito.timeout(5000).times(12)).getPreDistributedJobStore();
-        verify(nc1, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
-        verify(nc2, Mockito.timeout(5000).times(2)).removeActivityClusterGraph(any());
-
-        //confirm the second job is destroyed
-        Assert.assertTrue(nc1.getActivityClusterGraph(jobId2) == null && nc2.getActivityClusterGraph(jobId2) == null);
-        cc.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId2);
-    }
-
-    @AfterClass
-    public static void deinit() throws Exception {
-        nc2.stop();
-        nc1.stop();
-        cc.stop();
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index d3c34dd..e310385 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
@@ -56,10 +57,15 @@
         return frameManger.allocateFrame(bytes);
     }
 
-    ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData) throws HyracksDataException {
+    ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData)
+            throws HyracksDataException {
         return frameManger.reallocateFrame(tobeDeallocate, newFrameSizeInBytes, copyOldData);
     }
 
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return null;
+    }
+
     void deallocateFrames(int bytes) {
         frameManger.deallocateFrames(bytes);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 3d13cf9..ac52573 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -163,6 +163,10 @@
     }
 
     @Override
+    public byte[] getJobParameter(byte[] name, int start, int length) {
+        return new byte[0];
+    }
+
     public Set<JobFlag> getJobFlags() {
         return EnumSet.noneOf(JobFlag.class);
     }