[ASTERIXDB-2373][HYR,CLUS] Allow upsert of JobSpecs for Deployed Jobs

- user model changes: none
- storage format changes: none
- interface changes: new methods added

This change adds the upsertDeployedJobSpec method, enabling
a Deployed Job to update its Job Specification.

Added call in test.
Removed DeployedJobService (moved methods to BAD codebase).

Change-Id: I01fd5d43896d520fe75e1007d7bd39324f6f6e4b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2619
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
deleted file mode 100644
index bc6f1b1..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.time.Instant;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.asterix.common.transactions.ITxnIdFactory;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * Provides functionality for running DeployedJobSpecs
- */
-public class DeployedJobService {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    //To enable new Asterix TxnId for separate deployed job spec invocations
-    private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
-
-    //pool size one (only running one thread at a time)
-    private static final int POOL_SIZE = 1;
-
-    //Starts running a deployed job specification periodically with an interval of "duration" seconds
-    public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
-            IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId,
-            ITxnIdFactory txnIdFactory) {
-        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
-        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId,
-                            txnIdFactory)) {
-                        scheduledExecutorService.shutdown();
-                    }
-                } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, "Job Failed to run for " + entityId.getExtensionName() + " "
-                            + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
-                }
-            }
-        }, duration, duration, TimeUnit.MILLISECONDS);
-        return scheduledExecutorService;
-    }
-
-    public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
-            Map<byte[], byte[]> jobParameters, long duration, EntityId entityId, ITxnIdFactory txnIdFactory)
-            throws Exception {
-        long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory);
-        if (executionMilliseconds > duration && LOGGER.isErrorEnabled()) {
-            LOGGER.log(Level.ERROR,
-                    "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
-                            + entityId.getEntityName() + " was unable to meet the required period of " + duration
-                            + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
-                            + new Date());
-            return false;
-        }
-        return true;
-    }
-
-    public synchronized static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
-            Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory) throws Exception {
-        JobId jobId;
-        long startTime = Instant.now().toEpochMilli();
-
-        //Add the Asterix Transaction Id to the map
-        jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(txnIdFactory.create().getId()).getBytes());
-        jobId = hcc.startJob(distributedId, jobParameters);
-
-        hcc.waitForCompletion(jobId);
-        long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
-
-        LOGGER.log(Level.INFO,
-                "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
-                        + "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
-
-        return executionMilliseconds;
-
-    }
-
-    @Override
-    public String toString() {
-        return "DeployedJobSpecService";
-    }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 23c41fe..7182f42 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -39,8 +39,9 @@
         GET_JOB_STATUS,
         GET_JOB_INFO,
         START_JOB,
-        DISTRIBUTE_JOB,
-        DESTROY_JOB,
+        DEPLOY_JOB,
+        UNDEPLOY_JOB,
+        UPSERT_DEPLOYED_JOB,
         CANCEL_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
@@ -107,6 +108,32 @@
         }
     }
 
+    public static class UpsertDeployedJobSpecFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final byte[] acggfBytes;
+
+        private final DeployedJobSpecId deployedJobSpecId;
+
+        public UpsertDeployedJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) {
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.acggfBytes = acggfBytes;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UPSERT_DEPLOYED_JOB;
+        }
+
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
+        }
+
+        public DeployedJobSpecId getDeployedJobSpecId() {
+            return deployedJobSpecId;
+        }
+    }
+
     public static class DeployJobSpecFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -118,7 +145,7 @@
 
         @Override
         public FunctionId getFunctionId() {
-            return FunctionId.DISTRIBUTE_JOB;
+            return FunctionId.DEPLOY_JOB;
         }
 
         public byte[] getACGGFBytes() {
@@ -159,7 +186,7 @@
 
         @Override
         public FunctionId getFunctionId() {
-            return FunctionId.DESTROY_JOB;
+            return FunctionId.UNDEPLOY_JOB;
         }
 
         public DeployedJobSpecId getDeployedJobSpecId() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index eddcaa5..07ca6b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -98,6 +98,14 @@
     }
 
     @Override
+    public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes)
+            throws Exception {
+        HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf =
+                new HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction(deployedJobSpecId, acggfBytes);
+        return (DeployedJobSpecId) rpci.call(ipcHandle, udjsf);
+    }
+
+    @Override
     public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
         HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf =
                 new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index f676d27..5b98778 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -111,6 +111,14 @@
     }
 
     @Override
+    public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec)
+            throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+                new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+        return hci.upsertDeployedJobSpec(deployedJobSpecId, JavaSerializationUtils.serialize(jsacggf));
+    }
+
+    @Override
     public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception {
         JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
                 new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 510a6b6..61d1418 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -99,6 +99,18 @@
     DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
 
     /**
+     * Update the JobSpec for a deployed job.
+     *
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
+     * @param jobSpec
+     *            Job Specification
+     * @throws Exception
+     */
+    DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec)
+            throws Exception;
+
+    /**
      * Remove the deployed Job Spec
      *
      * @param deployedJobSpecId
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index f0c7872..2b92bcd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -45,6 +45,9 @@
 
     public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
 
+    public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes)
+            throws Exception;
+
     public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index e46aa7f..f123c8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -85,13 +85,19 @@
                 ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(),
                         new IPCResponder<JobInfo>(handle, mid)));
                 break;
-            case DISTRIBUTE_JOB:
+            case DEPLOY_JOB:
                 HyracksClientInterfaceFunctions.DeployJobSpecFunction djf =
                         (HyracksClientInterfaceFunctions.DeployJobSpecFunction) fn;
                 ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(),
-                        deployedJobSpecIdFactory.create(), new IPCResponder<>(handle, mid)));
+                        deployedJobSpecIdFactory.create(), false, new IPCResponder<>(handle, mid)));
                 break;
-            case DESTROY_JOB:
+            case UPSERT_DEPLOYED_JOB:
+                HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf =
+                        (HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction) fn;
+                ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, udjsf.getACGGFBytes(),
+                        udjsf.getDeployedJobSpecId(), true, new IPCResponder<>(handle, mid)));
+                break;
+            case UNDEPLOY_JOB:
                 HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf =
                         (HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn;
                 ccs.getWorkQueue().schedule(
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
index 0e22c25..041e224 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
@@ -40,9 +40,6 @@
     public void addDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId,
             ActivityClusterGraph activityClusterGraph, JobSpecification jobSpecification,
             Set<Constraint> activityClusterGraphConstraints) throws HyracksException {
-        if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
-        }
         DeployedJobSpecDescriptor descriptor =
                 new DeployedJobSpecDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
         deployedJobSpecDescriptorMap.put(deployedJobSpecId, descriptor);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
index c51f3c5..60c88c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
@@ -39,20 +39,24 @@
     private final byte[] acggfBytes;
     private final DeployedJobSpecId deployedJobSpecId;
     private final IResultCallback<DeployedJobSpecId> callback;
+    private final boolean upsert;
 
     public DeployJobSpecWork(ClusterControllerService ccs, byte[] acggfBytes, DeployedJobSpecId deployedJobSpecId,
-            IResultCallback<DeployedJobSpecId> callback) {
+            boolean upsert, IResultCallback<DeployedJobSpecId> callback) {
         this.deployedJobSpecId = deployedJobSpecId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.callback = callback;
+        this.upsert = upsert;
     }
 
     @Override
     protected void doRun() throws Exception {
         try {
             final CCServiceContext ccServiceCtx = ccs.getContext();
-            ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId);
+            if (!upsert) {
+                ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId);
+            }
             IActivityClusterGraphGeneratorFactory acggf =
                     (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
             IActivityClusterGraphGenerator acgg =
@@ -65,7 +69,7 @@
 
             INodeManager nodeManager = ccs.getNodeManager();
             for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
-                node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes);
+                node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes, upsert);
             }
             callback.setValue(deployedJobSpecId);
         } catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 78cd44d..fa835f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -54,7 +54,8 @@
 
     void undeployBinary(DeploymentId deploymentId) throws Exception;
 
-    void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
+    void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes, boolean checkForDuplicate)
+            throws Exception;
 
     void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 8e02936..dea5198 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -103,8 +103,8 @@
         SHUTDOWN_REQUEST,
         SHUTDOWN_RESPONSE,
 
-        DISTRIBUTE_JOB,
-        DESTROY_JOB,
+        DEPLOY_JOB,
+        UNDEPLOY_JOB,
         DEPLOYED_JOB_FAILURE,
 
         STATE_DUMP_REQUEST,
@@ -713,15 +713,18 @@
 
         private final byte[] acgBytes;
 
-        public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, CcId ccId) {
+        private final boolean upsert;
+
+        public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, boolean upsert, CcId ccId) {
             super(ccId);
             this.deployedJobSpecId = deployedJobSpecId;
             this.acgBytes = acgBytes;
+            this.upsert = upsert;
         }
 
         @Override
         public FunctionId getFunctionId() {
-            return FunctionId.DISTRIBUTE_JOB;
+            return FunctionId.DEPLOY_JOB;
         }
 
         public DeployedJobSpecId getDeployedJobSpecId() {
@@ -731,6 +734,10 @@
         public byte[] getacgBytes() {
             return acgBytes;
         }
+
+        public boolean getUpsert() {
+            return upsert;
+        }
     }
 
     public static class UndeployJobSpecFunction extends CCIdentifiedFunction {
@@ -745,7 +752,7 @@
 
         @Override
         public FunctionId getFunctionId() {
-            return FunctionId.DESTROY_JOB;
+            return FunctionId.UNDEPLOY_JOB;
         }
 
         public DeployedJobSpecId getDeployedJobSpecId() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index d6867eb..8242bdc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -103,8 +103,8 @@
     }
 
     @Override
-    public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
-        DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, ccId);
+    public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes, boolean upsert) throws Exception {
+        DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, upsert, ccId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 735f7cf..08cd5d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -107,13 +107,13 @@
                 ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId(), ndbf.getCcId()));
                 return;
 
-            case DISTRIBUTE_JOB:
+            case DEPLOY_JOB:
                 CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn;
-                ncs.getWorkQueue().schedule(
-                        new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), djf.getCcId()));
+                ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(),
+                        djf.getUpsert(), djf.getCcId()));
                 return;
 
-            case DESTROY_JOB:
+            case UNDEPLOY_JOB:
                 CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn;
                 ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId(), dsjf.getCcId()));
                 return;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 6a7d645..aa2320a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -470,7 +470,6 @@
     }
 
     private ConcurrentHashMap<CcId, Serializable> getDistributedState() {
-        //noinspection unchecked
         return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState();
     }
 
@@ -566,9 +565,6 @@
 
     public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg)
             throws HyracksException {
-        if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) {
-            throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
-        }
         deployedJobSpecActivityClusterGraphMap.put(deployedJobSpecId.getId(), acg);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
index 92612dd..bcdb97f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 /**
- * pre-distribute a job that can be executed later
+ * Deploy a job that can be executed later
  *
  */
 public class DeployJobSpecWork extends AbstractWork {
@@ -37,19 +37,23 @@
     private final byte[] acgBytes;
     private final CcId ccId;
     private final DeployedJobSpecId deployedJobSpecId;
+    private final boolean upsert;
 
     public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, byte[] acgBytes,
-            CcId ccId) {
+            boolean upsert, CcId ccId) {
         this.ncs = ncs;
         this.deployedJobSpecId = deployedJobSpecId;
         this.acgBytes = acgBytes;
         this.ccId = ccId;
+        this.upsert = upsert;
     }
 
     @Override
     public void run() {
         try {
-            ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId);
+            if (!upsert) {
+                ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId);
+            }
             ActivityClusterGraph acg =
                     (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
             ncs.storeActivityClusterGraph(deployedJobSpecId, acg);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
index 40b6b27..834fab5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
@@ -199,6 +199,16 @@
         for (int i = 0; i < 100; i++) {
             hcc.startJob(distributedId2, new HashMap<>());
         }
+
+        //Change the second job into the first job and see whether it runs
+        hcc.upsertDeployedJobSpec(distributedId2, spec1);
+        JobId jobRunId4 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId4);
+
+        //Run it one more time
+        JobId jobRunId5 = hcc.startJob(distributedId2, new HashMap<>());
+        hcc.waitForCompletion(jobRunId5);
+
     }
 
     @AfterClass