[ASTERIXDB-2386][CLUS] Allow extension of the global recovery manager
Allow the Asterix GlobalRecoveryManager to be extended
Cleanup some deployed job spec methods
Change-Id: I1213e702a77ededde18ee0b50bc105212f43480d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2640
Reviewed-by: Till Westmann <tillw@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 1ba418a..7f632f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -28,12 +28,15 @@
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.common.api.ExtensionId;
import org.apache.asterix.common.api.IExtension;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.compiler.provider.AqlCompilationProvider;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
+import org.apache.asterix.hyracks.bootstrap.GlobalRecoveryManager;
import org.apache.asterix.om.functions.IFunctionExtensionManager;
import org.apache.asterix.om.functions.IFunctionManager;
import org.apache.asterix.runtime.functions.FunctionCollection;
@@ -41,6 +44,8 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.utils.ExtensionUtil;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -53,6 +58,7 @@
private final ILangCompilationProvider aqlCompilationProvider;
private final ILangCompilationProvider sqlppCompilationProvider;
private final IFunctionManager functionManager;
+ private final IGlobalRecoveryExtension globalRecoveryExtension;
private transient IStatementExecutorFactory statementExecutorFactory;
/**
@@ -71,6 +77,7 @@
Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null;
Pair<ExtensionId, IFunctionManager> fm = null;
IStatementExecutorExtension see = null;
+ IGlobalRecoveryExtension gre = null;
if (list != null) {
Set<ExtensionId> extensionIds = new HashSet<>();
for (AsterixExtension extensionConf : list) {
@@ -89,6 +96,9 @@
sqlppcp = ExtensionUtil.extendLangCompilationProvider(Language.SQLPP, sqlppcp, le);
fm = ExtensionUtil.extendFunctionManager(fm, le);
break;
+ case RECOVERY:
+ gre = (IGlobalRecoveryExtension) extension;
+ break;
default:
break;
}
@@ -99,6 +109,7 @@
this.sqlppCompilationProvider = sqlppcp == null ? new SqlppCompilationProvider() : sqlppcp.second;
this.functionManager =
fm == null ? new FunctionManager(FunctionCollection.createDefaultFunctionCollection()) : fm.second;
+ this.globalRecoveryExtension = gre;
}
/** @deprecated use getStatementExecutorFactory instead */
@@ -127,6 +138,14 @@
}
}
+ public IGlobalRecoveryManager getGlobalRecoveryManager(ICCServiceContext serviceCtx, IHyracksClientConnection hcc,
+ IStorageComponentProvider componentProvider) {
+ if (globalRecoveryExtension == null) {
+ return new GlobalRecoveryManager(serviceCtx, hcc, componentProvider);
+ }
+ return globalRecoveryExtension.getGlobalRecoveryManager(serviceCtx, hcc, componentProvider);
+ }
+
@Override
public IFunctionManager getFunctionManager() {
return functionManager;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IGlobalRecoveryExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IGlobalRecoveryExtension.java
new file mode 100644
index 0000000..e9652a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IGlobalRecoveryExtension.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.cc;
+
+import org.apache.asterix.common.api.IExtension;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+/**
+ * An interface for extensions of {@code IGlobalRecoveryManager}
+ */
+public interface IGlobalRecoveryExtension extends IExtension {
+
+ @Override
+ default ExtensionKind getExtensionKind() {
+ return ExtensionKind.RECOVERY;
+ }
+
+ IGlobalRecoveryManager getGlobalRecoveryManager(ICCServiceContext serviceCtx, IHyracksClientConnection hcc,
+ IStorageComponentProvider componentProvider);
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index a3ca8b2..8f78ce1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -54,7 +54,9 @@
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.MetadataProperties;
@@ -62,6 +64,7 @@
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.utils.Servlets;
@@ -139,12 +142,13 @@
INcLifecycleCoordinator lifecycleCoordinator = createNcLifeCycleCoordinator(repProp.isReplicationEnabled());
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
componentProvider = new StorageComponentProvider();
- GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
+
+ List<AsterixExtension> extensions = new ArrayList<>();
+ extensions.addAll(getExtensions());
+ ccExtensionManager = new CCExtensionManager(extensions);
+ IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
statementExecutorCtx = new StatementExecutorContext();
appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator);
- List<AsterixExtension> extensions = new ArrayList<>();
- extensions.addAll(this.getExtensions());
- ccExtensionManager = new CCExtensionManager(extensions);
appCtx.setExtensionManager(ccExtensionManager);
final CCConfig ccConfig = controllerService.getCCConfig();
if (System.getProperty("java.rmi.server.hostname") == null) {
@@ -170,15 +174,15 @@
}
protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager,
- GlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator)
+ IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator)
throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
new MetadataLockManager());
}
- protected GlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
- return new GlobalRecoveryManager(ccServiceCtx, getHcc(), componentProvider);
+ protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
+ return ccExtensionManager.getGlobalRecoveryManager(ccServiceCtx, getHcc(), componentProvider);
}
protected INcLifecycleCoordinator createNcLifeCycleCoordinator(boolean replicationEnabled) {
@@ -191,8 +195,8 @@
LoggingConfigUtil.defaultIfMissing(GlobalConfig.ASTERIX_LOGGER_NAME, level);
}
- protected List<AsterixExtension> getExtensions() {
- return appCtx.getExtensionProperties().getExtensions();
+ protected List<AsterixExtension> getExtensions() throws Exception {
+ return new ExtensionProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())).getExtensions();
}
protected void configureServers() throws Exception {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3d9b822..5fc5c57 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -56,7 +56,7 @@
private static final Logger LOGGER = LogManager.getLogger();
protected final IStorageComponentProvider componentProvider;
protected final ICCServiceContext serviceCtx;
- protected IHyracksClientConnection hcc;
+ protected final IHyracksClientConnection hcc;
protected volatile boolean recoveryCompleted;
protected volatile boolean recovering;
@@ -126,7 +126,9 @@
throws Exception {
// Loop over datasets
for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
- mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
+ mdTxnCtx = recoverDatasets(appCtx, mdTxnCtx, dataverse);
+ // Fixes ASTERIXDB-2386 by caching the dataverse during recovery
+ MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse.getDataverseName());
}
return mdTxnCtx;
}
@@ -138,8 +140,8 @@
}
}
- private MetadataTransactionContext recoverDataset(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx,
- Dataverse dataverse) throws Exception {
+ private MetadataTransactionContext recoverDatasets(ICcApplicationContext appCtx,
+ MetadataTransactionContext mdTxnCtx, Dataverse dataverse) throws Exception {
if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
MetadataProvider metadataProvider = new MetadataProvider(appCtx, dataverse);
try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java
index 9551935..7e9879a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java
@@ -40,7 +40,11 @@
/**
* Extends Language Syntax and Algebric Operations
*/
- LANG
+ LANG,
+ /**
+ * Extends Recovery Capabilities
+ */
+ RECOVERY
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 7182f42..ad2e77a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -41,7 +41,7 @@
START_JOB,
DEPLOY_JOB,
UNDEPLOY_JOB,
- UPSERT_DEPLOYED_JOB,
+ REDEPLOY_JOB,
CANCEL_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
@@ -108,21 +108,21 @@
}
}
- public static class UpsertDeployedJobSpecFunction extends Function {
+ public static class redeployJobSpecFunction extends Function {
private static final long serialVersionUID = 1L;
private final byte[] acggfBytes;
private final DeployedJobSpecId deployedJobSpecId;
- public UpsertDeployedJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) {
+ public redeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) {
this.deployedJobSpecId = deployedJobSpecId;
this.acggfBytes = acggfBytes;
}
@Override
public FunctionId getFunctionId() {
- return FunctionId.UPSERT_DEPLOYED_JOB;
+ return FunctionId.REDEPLOY_JOB;
}
public byte[] getACGGFBytes() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 07ca6b0..1ee9bd8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -98,18 +98,17 @@
}
@Override
- public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes)
- throws Exception {
- HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf =
- new HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction(deployedJobSpecId, acggfBytes);
- return (DeployedJobSpecId) rpci.call(ipcHandle, udjsf);
+ public void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) throws Exception {
+ HyracksClientInterfaceFunctions.redeployJobSpecFunction udjsf =
+ new HyracksClientInterfaceFunctions.redeployJobSpecFunction(deployedJobSpecId, acggfBytes);
+ rpci.call(ipcHandle, udjsf);
}
@Override
- public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
+ public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf =
new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId);
- return (DeployedJobSpecId) rpci.call(ipcHandle, sjf);
+ rpci.call(ipcHandle, sjf);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 5b98778..f635d94 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -111,11 +111,10 @@
}
@Override
- public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec)
- throws Exception {
+ public void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec) throws Exception {
JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
- return hci.upsertDeployedJobSpec(deployedJobSpecId, JavaSerializationUtils.serialize(jsacggf));
+ hci.redeployJobSpec(deployedJobSpecId, JavaSerializationUtils.serialize(jsacggf));
}
@Override
@@ -126,8 +125,8 @@
}
@Override
- public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
- return hci.undeployJobSpec(deployedJobSpecId);
+ public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
+ hci.undeployJobSpec(deployedJobSpecId);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 61d1418..b3b7677 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -107,8 +107,7 @@
* Job Specification
* @throws Exception
*/
- DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec)
- throws Exception;
+ void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec) throws Exception;
/**
* Remove the deployed Job Spec
@@ -117,7 +116,7 @@
* The id of the deployed job spec
* @throws Exception
*/
- DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
+ void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
/**
* Used to run a deployed Job Spec by id
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 2b92bcd..6a75806 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -45,10 +45,9 @@
public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
- public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes)
- throws Exception;
+ public void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) throws Exception;
- public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
+ public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index f123c8a..a669402 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -91,9 +91,9 @@
ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, djf.getACGGFBytes(),
deployedJobSpecIdFactory.create(), false, new IPCResponder<>(handle, mid)));
break;
- case UPSERT_DEPLOYED_JOB:
- HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction udjsf =
- (HyracksClientInterfaceFunctions.UpsertDeployedJobSpecFunction) fn;
+ case REDEPLOY_JOB:
+ HyracksClientInterfaceFunctions.redeployJobSpecFunction udjsf =
+ (HyracksClientInterfaceFunctions.redeployJobSpecFunction) fn;
ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, udjsf.getACGGFBytes(),
udjsf.getDeployedJobSpecId(), true, new IPCResponder<>(handle, mid)));
break;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
index 143c8c1..69b55ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/UndeployJobSpecWork.java
@@ -28,10 +28,10 @@
public class UndeployJobSpecWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final DeployedJobSpecId deployedJobSpecId;
- private final IResultCallback<DeployedJobSpecId> callback;
+ private final IResultCallback<Void> callback;
public UndeployJobSpecWork(ClusterControllerService ccs, DeployedJobSpecId deployedJobSpecId,
- IResultCallback<DeployedJobSpecId> callback) {
+ IResultCallback<Void> callback) {
this.deployedJobSpecId = deployedJobSpecId;
this.ccs = ccs;
this.callback = callback;
@@ -45,7 +45,7 @@
for (NodeControllerState node : nodeManager.getAllNodeControllerStates()) {
node.getNodeController().undeployJobSpec(deployedJobSpecId);
}
- callback.setValue(deployedJobSpecId);
+ callback.setValue(null);
} catch (Exception e) {
callback.setException(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
index 834fab5..d8f4064 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/DeployedJobSpecsTest.java
@@ -201,7 +201,7 @@
}
//Change the second job into the first job and see whether it runs
- hcc.upsertDeployedJobSpec(distributedId2, spec1);
+ hcc.redeployJobSpec(distributedId2, spec1);
JobId jobRunId4 = hcc.startJob(distributedId2, new HashMap<>());
hcc.waitForCompletion(jobRunId4);