[ASTERIXDB-2373][HYR,CLUS] Allow upsert of JobSpecs for Deployed Jobs
- user model changes: none
- storage format changes: none
- interface changes: new methods added
This change adds the upsertDeployedJobSpec method, enabling
a Deployed Job to update its Job Specification.
Added call in test.
Removed DeployedJobService (moved methods to BAD codebase).
Change-Id: I01fd5d43896d520fe75e1007d7bd39324f6f6e4b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2619
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
deleted file mode 100644
index bc6f1b1..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/DeployedJobService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.time.Instant;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.asterix.common.transactions.ITxnIdFactory;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * Provides functionality for running DeployedJobSpecs
- */
-public class DeployedJobService {
-
- private static final Logger LOGGER = LogManager.getLogger();
-
- //To enable new Asterix TxnId for separate deployed job spec invocations
- private static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
-
- //pool size one (only running one thread at a time)
- private static final int POOL_SIZE = 1;
-
- //Starts running a deployed job specification periodically with an interval of "duration" seconds
- public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
- IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId,
- ITxnIdFactory txnIdFactory) {
- ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
- scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (!runRepetitiveDeployedJobSpec(distributedId, hcc, jobParameters, duration, entityId,
- txnIdFactory)) {
- scheduledExecutorService.shutdown();
- }
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, "Job Failed to run for " + entityId.getExtensionName() + " "
- + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
- }
- }
- }, duration, duration, TimeUnit.MILLISECONDS);
- return scheduledExecutorService;
- }
-
- public static boolean runRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
- Map<byte[], byte[]> jobParameters, long duration, EntityId entityId, ITxnIdFactory txnIdFactory)
- throws Exception {
- long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory);
- if (executionMilliseconds > duration && LOGGER.isErrorEnabled()) {
- LOGGER.log(Level.ERROR,
- "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
- + entityId.getEntityName() + " was unable to meet the required period of " + duration
- + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
- + new Date());
- return false;
- }
- return true;
- }
-
- public synchronized static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
- Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory) throws Exception {
- JobId jobId;
- long startTime = Instant.now().toEpochMilli();
-
- //Add the Asterix Transaction Id to the map
- jobParameters.put(TRANSACTION_ID_PARAMETER_NAME, String.valueOf(txnIdFactory.create().getId()).getBytes());
- jobId = hcc.startJob(distributedId, jobParameters);
-
- hcc.waitForCompletion(jobId);
- long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
-
- LOGGER.log(Level.INFO,
- "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
- + "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
-
- return executionMilliseconds;
-
- }
-
- @Override
- public String toString() {
- return "DeployedJobSpecService";
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 23c41fe..7182f42 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -39,8 +39,9 @@
GET_JOB_STATUS,
GET_JOB_INFO,
START_JOB,
- DISTRIBUTE_JOB,
- DESTROY_JOB,
+ DEPLOY_JOB,
+ UNDEPLOY_JOB,
+ UPSERT_DEPLOYED_JOB,
CANCEL_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
@@ -107,6 +108,32 @@
}
}
+ public static class UpsertDeployedJobSpecFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] acggfBytes;
+
+ private final DeployedJobSpecId deployedJobSpecId;
+
+ public UpsertDeployedJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) {
+ this.deployedJobSpecId = deployedJobSpecId;
+ this.acggfBytes = acggfBytes;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.UPSERT_DEPLOYED_JOB;
+ }
+
+ public byte[] getACGGFBytes() {
+ return acggfBytes;
+ }
+
+ public DeployedJobSpecId getDeployedJobSpecId() {
+ return deployedJobSpecId;
+ }
+ }
+
public static class DeployJobSpecFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -118,7 +145,7 @@
@Override
public FunctionId getFunctionId() {
- return FunctionId.DISTRIBUTE_JOB;
+ return FunctionId.DEPLOY_JOB;
}
public byte[] getACGGFBytes() {
@@ -159,7 +186,7 @@
@Override
public FunctionId getFunctionId() {
- return FunctionId.DESTROY_JOB;
+ return FunctionId.UNDEPLOY_JOB;
}
public DeployedJobSpecId getDeployedJobSpecId() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index eddcaa5..07ca6b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -98,6 +98,14 @@
}
@Override
+ public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes)
+ throws Exception {
+ HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf =
+ new HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction(deployedJobSpecId, acggfBytes);
+ return (DeployedJobSpecId) rpci.call(ipcHandle, udjsf);
+ }
+
+ @Override
public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf =
new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index f676d27..5b98778 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -111,6 +111,14 @@
}
@Override
+ public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec)
+ throws Exception {
+ JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+ new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+ return hci.upsertDeployedJobSpec(deployedJobSpecId, JavaSerializationUtils.serialize(jsacggf));
+ }
+
+ @Override
public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception {
JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 510a6b6..61d1418 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -99,6 +99,18 @@
DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
/**
+ * Update the JobSpec for a deployed job.
+ *
+ * @param deployedJobSpecId
+ * The id of the deployed job spec
+ * @param jobSpec
+ * Job Specification
+ * @throws Exception
+ */
+ DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec)
+ throws Exception;
+
+ /**
* Remove the deployed Job Spec
*
* @param deployedJobSpecId
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index f0c7872..2b92bcd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -45,6 +45,9 @@
public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
+ public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes)
+ throws Exception;
+
public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index e46aa7f..f123c8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -85,13 +85,19 @@
ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs.getJobManager(), gjif.getJobId(),
new IPCResponder<JobInfo>(handle, mid)));
break;
- case DISTRIBUTE_JOB:
+ case DEPLOY_JOB:
HyracksClientInterfaceFunctions.DeployJobSpecFunction djf =
(HyracksClientInterfaceFunctions.DeployJobSpecFunction) fn;
ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(),
- deployedJobSpecIdFactory.create(), new IPCResponder<>(handle, mid)));
+ deployedJobSpecIdFactory.create(), false, new IPCResponder<>(handle, mid)));
break;
- case DESTROY_JOB:
+ case UPSERT_DEPLOYED_JOB:
+ HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf =
+ (HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction) fn;
+ ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, udjsf.getACGGFBytes(),
+ udjsf.getDeployedJobSpecId(), true, new IPCResponder<>(handle, mid)));
+ break;
+ case UNDEPLOY_JOB:
HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf =
(HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn;
ccs.getWorkQueue().schedule(
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
index 0e22c25..041e224 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/DeployedJobSpecStore.java
@@ -40,9 +40,6 @@
public void addDeployedJobSpecDescriptor(DeployedJobSpecId deployedJobSpecId,
ActivityClusterGraph activityClusterGraph, JobSpecification jobSpecification,
Set<Constraint> activityClusterGraphConstraints) throws HyracksException {
- if (deployedJobSpecDescriptorMap.get(deployedJobSpecId) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
- }
DeployedJobSpecDescriptor descriptor =
new DeployedJobSpecDescriptor(activityClusterGraph, jobSpecification, activityClusterGraphConstraints);
deployedJobSpecDescriptorMap.put(deployedJobSpecId, descriptor);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
index c51f3c5..60c88c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DeployJobSpecWork.java
@@ -39,20 +39,24 @@
private final byte[] acggfBytes;
private final DeployedJobSpecId deployedJobSpecId;
private final IResultCallback<DeployedJobSpecId> callback;
+ private final boolean upsert;
public DeployJobSpecWork(ClusterControllerService ccs, byte[] acggfBytes, DeployedJobSpecId deployedJobSpecId,
- IResultCallback<DeployedJobSpecId> callback) {
+ boolean upsert, IResultCallback<DeployedJobSpecId> callback) {
this.deployedJobSpecId = deployedJobSpecId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.callback = callback;
+ this.upsert = upsert;
}
@Override
protected void doRun() throws Exception {
try {
final CCServiceContext ccServiceCtx = ccs.getContext();
- ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId);
+ if (!upsert) {
+ ccs.getDeployedJobSpecStore().checkForExistingDeployedJobSpecDescriptor(deployedJobSpecId);
+ }
IActivityClusterGraphGeneratorFactory acggf =
(IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
IActivityClusterGraphGenerator acgg =
@@ -65,7 +69,7 @@
INodeManager nodeManager = ccs.getNodeManager();
for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
- node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes);
+ node.getNodeController().deployJobSpec(deployedJobSpecId, acgBytes, upsert);
}
callback.setValue(deployedJobSpecId);
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 78cd44d..fa835f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -54,7 +54,8 @@
void undeployBinary(DeploymentId deploymentId) throws Exception;
- void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
+ void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes, boolean checkForDuplicate)
+ throws Exception;
void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 8e02936..dea5198 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -103,8 +103,8 @@
SHUTDOWN_REQUEST,
SHUTDOWN_RESPONSE,
- DISTRIBUTE_JOB,
- DESTROY_JOB,
+ DEPLOY_JOB,
+ UNDEPLOY_JOB,
DEPLOYED_JOB_FAILURE,
STATE_DUMP_REQUEST,
@@ -713,15 +713,18 @@
private final byte[] acgBytes;
- public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, CcId ccId) {
+ private final boolean upsert;
+
+ public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, boolean upsert, CcId ccId) {
super(ccId);
this.deployedJobSpecId = deployedJobSpecId;
this.acgBytes = acgBytes;
+ this.upsert = upsert;
}
@Override
public FunctionId getFunctionId() {
- return FunctionId.DISTRIBUTE_JOB;
+ return FunctionId.DEPLOY_JOB;
}
public DeployedJobSpecId getDeployedJobSpecId() {
@@ -731,6 +734,10 @@
public byte[] getacgBytes() {
return acgBytes;
}
+
+ public boolean getUpsert() {
+ return upsert;
+ }
}
public static class UndeployJobSpecFunction extends CCIdentifiedFunction {
@@ -745,7 +752,7 @@
@Override
public FunctionId getFunctionId() {
- return FunctionId.DESTROY_JOB;
+ return FunctionId.UNDEPLOY_JOB;
}
public DeployedJobSpecId getDeployedJobSpecId() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index d6867eb..8242bdc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -103,8 +103,8 @@
}
@Override
- public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
- DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, ccId);
+ public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes, boolean upsert) throws Exception {
+ DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, upsert, ccId);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 735f7cf..08cd5d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -107,13 +107,13 @@
ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId(), ndbf.getCcId()));
return;
- case DISTRIBUTE_JOB:
+ case DEPLOY_JOB:
CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn;
- ncs.getWorkQueue().schedule(
- new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), djf.getCcId()));
+ ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(),
+ djf.getUpsert(), djf.getCcId()));
return;
- case DESTROY_JOB:
+ case UNDEPLOY_JOB:
CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn;
ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId(), dsjf.getCcId()));
return;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 6a7d645..aa2320a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -470,7 +470,6 @@
}
private ConcurrentHashMap<CcId, Serializable> getDistributedState() {
- //noinspection unchecked
return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState();
}
@@ -566,9 +565,6 @@
public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg)
throws HyracksException {
- if (deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId()) != null) {
- throw HyracksException.create(ErrorCode.DUPLICATE_DEPLOYED_JOB, deployedJobSpecId);
- }
deployedJobSpecActivityClusterGraphMap.put(deployedJobSpecId.getId(), acg);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
index 92612dd..bcdb97f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
@@ -28,7 +28,7 @@
import org.apache.hyracks.control.nc.NodeControllerService;
/**
- * pre-distribute a job that can be executed later
+ * Deploy a job that can be executed later
*
*/
public class DeployJobSpecWork extends AbstractWork {
@@ -37,19 +37,23 @@
private final byte[] acgBytes;
private final CcId ccId;
private final DeployedJobSpecId deployedJobSpecId;
+ private final boolean upsert;
public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, byte[] acgBytes,
- CcId ccId) {
+ boolean upsert, CcId ccId) {
this.ncs = ncs;
this.deployedJobSpecId = deployedJobSpecId;
this.acgBytes = acgBytes;
this.ccId = ccId;
+ this.upsert = upsert;
}
@Override
public void run() {
try {
- ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId);
+ if (!upsert) {
+ ncs.checkForDuplicateDeployedJobSpec(deployedJobSpecId);
+ }
ActivityClusterGraph acg =
(ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
ncs.storeActivityClusterGraph(deployedJobSpecId, acg);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
index 40b6b27..834fab5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
@@ -199,6 +199,16 @@
for (int i = 0; i < 100; i++) {
hcc.startJob(distributedId2, new HashMap<>());
}
+
+ //Change the second job into the first job and see whether it runs
+ hcc.upsertDeployedJobSpec(distributedId2, spec1);
+ JobId jobRunId4 = hcc.startJob(distributedId2, new HashMap<>());
+ hcc.waitForCompletion(jobRunId4);
+
+ //Run it one more time
+ JobId jobRunId5 = hcc.startJob(distributedId2, new HashMap<>());
+ hcc.waitForCompletion(jobRunId5);
+
}
@AfterClass