[NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes
- Ensure thread factory is configured before using it
- Don't mark cluster state ACTIVE until after global recovery has
completed
- Failure of global recovery causes CC to shutdown
- Don't mark cluster state ACTIVE until max resource id has been
reported by all nodes
Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2004
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 37e0c58..ef3800c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -108,12 +108,19 @@
private IHyracksClientConnection hcc;
@Override
- public void start(IServiceContext serviceCtx, String[] args) throws Exception {
+ public void init(IServiceContext serviceCtx) throws Exception {
+ ccServiceCtx = (ICCServiceContext) serviceCtx;
+ ccServiceCtx.setThreadFactory(
+ new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
+ }
+
+ @Override
+ public void start(String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
- final ClusterControllerService controllerService = (ClusterControllerService) serviceCtx.getControllerService();
- this.ccServiceCtx = (ICCServiceContext) serviceCtx;
+ final ClusterControllerService controllerService = (ClusterControllerService) ccServiceCtx
+ .getControllerService();
ccServiceCtx.setMessageBroker(new CCMessageBroker(controllerService));
configureLoggingLevel(ccServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL));
@@ -122,8 +129,6 @@
LOGGER.info("Starting Asterix cluster controller");
}
- ccServiceCtx.setThreadFactory(
- new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
hcc = new HyracksConnection(strIP, port);
@@ -207,8 +212,8 @@
}
protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception {
- HttpServer jsonAPIServer =
- new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort());
+ HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
+ externalProperties.getAPIServerPort());
jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3209557..13f3afa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -92,11 +92,8 @@
MetadataTransactionContext mdTxnCtx = null;
try {
MetadataManager.INSTANCE.init();
- // Loop over datasets
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
- mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
- }
+ mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
// This needs to be fixed <-- Needs to shutdown the system -->
@@ -109,8 +106,8 @@
try {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
} catch (Exception e1) {
- LOGGER.log(Level.SEVERE, "Exception in aborting", e1);
- e1.addSuppressed(e);
+ LOGGER.log(Level.SEVERE, "Exception aborting metadata transaction", e1);
+ e.addSuppressed(e1);
throw new IllegalStateException(e);
}
}
@@ -118,11 +115,21 @@
}
recoveryCompleted = true;
LOGGER.info("Global Recovery Completed");
+ appCtx.getClusterStateManager().refreshState();
+ }
+
+ protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx)
+ throws Exception {
+ // Loop over datasets
+ for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
+ mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
+ }
+ return mdTxnCtx;
}
@Override
public void notifyStateChange(ClusterState newState) {
- if (newState != ClusterState.ACTIVE) {
+ if (newState != ClusterState.ACTIVE && newState != ClusterState.RECOVERING) {
recoveryCompleted = false;
}
}
@@ -132,8 +139,8 @@
if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
MetadataProvider metadataProvider = new MetadataProvider(appCtx, dataverse);
try {
- List<Dataset> datasets =
- MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName());
+ List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+ dataverse.getDataverseName());
for (Dataset dataset : datasets) {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
// External dataset
@@ -145,8 +152,8 @@
TransactionState datasetState = dsd.getState();
if (!indexes.isEmpty()) {
if (datasetState == TransactionState.BEGIN) {
- List<ExternalFile> files =
- MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+ dataset);
// if persumed abort, roll backward
// 1. delete all pending files
for (ExternalFile file : files) {
@@ -157,8 +164,8 @@
}
// 2. clean artifacts in NCs
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec =
- ExternalIndexingOperations.buildAbortOp(dataset, indexes, metadataProvider);
+ JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(dataset, indexes,
+ metadataProvider);
executeHyracksJob(jobSpec);
// 3. correct the dataset state
((ExternalDatasetDetails) dataset.getDatasetDetails()).setState(TransactionState.COMMIT);
@@ -166,13 +173,13 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
} else if (datasetState == TransactionState.READY_TO_COMMIT) {
- List<ExternalFile> files =
- MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+ dataset);
// if ready to commit, roll forward
// 1. commit indexes in NCs
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec =
- ExternalIndexingOperations.buildRecoverOp(dataset, indexes, metadataProvider);
+ JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(dataset, indexes,
+ metadataProvider);
executeHyracksJob(jobSpec);
// 2. add pending files in metadata
for (ExternalFile file : files) {
@@ -221,4 +228,5 @@
public boolean isRecoveryCompleted() {
return recoveryCompleted;
}
+
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0f6b396..e8f63b4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -80,13 +80,17 @@
}
@Override
- public void start(IServiceContext serviceCtx, String[] args) throws Exception {
- if (args.length > 0) {
- throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
- }
+ public void init(IServiceContext serviceCtx) throws Exception {
this.ncServiceCtx = (INCServiceContext) serviceCtx;
ncServiceCtx.setThreadFactory(
new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager()));
+ }
+
+ @Override
+ public void start(String[] args) throws Exception {
+ if (args.length > 0) {
+ throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+ }
nodeId = this.ncServiceCtx.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -111,8 +115,8 @@
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
- MessagingChannelInterfaceFactory interfaceFactory =
- new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
+ MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
+ (NCMessageBroker) messageBroker, messagingProperties);
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -224,8 +228,8 @@
String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
.getStorageMountingPoints();
for (String ioDevice : ioDevices) {
- String tempDatasetsDir =
- ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+ String tempDatasetsDir = ioDevice + storageDirName + File.separator
+ + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
File tmpDsDir = new File(tempDatasetsDir);
if (tmpDsDir.exists()) {
IoUtil.delete(tmpDsDir);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index bdbf4a5..e3424ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -20,23 +20,24 @@
public interface IClusterManagementWork {
- public enum WorkType {
+ enum WorkType {
ADD_NODE,
REMOVE_NODE
}
- public enum ClusterState {
- STARTING,
- PENDING,
- ACTIVE,
- UNUSABLE,
- REBALANCING,
- SHUTTING_DOWN
+ enum ClusterState {
+ STARTING, // the initial state
+ UNUSABLE, // one or more cluster partitions are inactive or max id resources have not been reported
+ PENDING, // the metadata node has not yet joined & initialized
+ RECOVERING, // global recovery has not yet completed
+ ACTIVE, // cluster is ACTIVE and ready for requests
+ REBALANCING, // replication is processing failbacks
+ SHUTTING_DOWN // a shutdown request has been received, and is underway
}
- public WorkType getClusterManagementWorkType();
+ WorkType getClusterManagementWorkType();
- public int getWorkId();
+ int getWorkId();
- public IClusterEventsSubscriber getSourceSubscriber();
+ IClusterEventsSubscriber getSourceSubscriber();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index d36d383..ce49ccf6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -18,12 +18,14 @@
*/
package org.apache.asterix.common.transactions;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public interface IResourceIdManager {
long createResourceId();
boolean reported(String nodeId);
- void report(String nodeId, long maxResourceId);
+ void report(String nodeId, long maxResourceId) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
index 8a3392a..a97e22a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.metadata;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -29,7 +30,9 @@
public class GarbageCollector implements Runnable {
private static final Logger LOGGER = Logger.getLogger(GarbageCollector.class.getName());
- private static final long CLEANUP_PERIOD = 3600L * 24;
+ // TODO(mblow): make this configurable
+ private static final long CLEANUP_PERIOD = 1;
+ private static final TimeUnit CLEANUP_PERIOD_UNIT = TimeUnit.DAYS;
static {
// Starts the garbage collector thread which
@@ -40,13 +43,13 @@
}
@Override
- @SuppressWarnings("squid:S2142") // rethrow or interrupt thread on InterruptedException
+ @SuppressWarnings({"squid:S2142", "squid:S2189"}) // rethrow/interrupt thread on InterruptedException, endless loop
public void run() {
LOGGER.info("Starting Metadata GC");
while (true) {
try {
synchronized (this) {
- this.wait(CLEANUP_PERIOD);
+ CLEANUP_PERIOD_UNIT.timedWait(this, CLEANUP_PERIOD);
}
MetadataManager.INSTANCE.cleanupTempDatasets();
} catch (InterruptedException e) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 82a1177..decc1a9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -71,6 +71,6 @@
@Override
public String toString() {
- return ReportMaxResourceIdRequestMessage.class.getSimpleName();
+ return ResourceIdRequestMessage.class.getSimpleName();
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 6a5ed08..afa626d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ResourceIdManager implements IResourceIdManager {
@@ -59,13 +60,14 @@
}
@Override
- public synchronized void report(String nodeId, long maxResourceId) {
+ public synchronized void report(String nodeId, long maxResourceId) throws HyracksDataException {
if (!allReported) {
globalResourceId.set(Math.max(maxResourceId, globalResourceId.get()));
reportedNodes.add(nodeId);
if (reportedNodes.size() == csm.getNumberOfNodes()) {
reportedNodes = null;
allReported = true;
+ csm.refreshState();
}
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 36cb10d..334b683 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -40,6 +40,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -118,6 +119,10 @@
@Override
public synchronized void setState(ClusterState state) {
+ if (this.state == state) {
+ LOGGER.info("ignoring update to same cluster state of " + this.state);
+ return;
+ }
LOGGER.info("updating cluster state from " + this.state + " to " + state.name());
this.state = state;
appCtx.getGlobalRecoveryManager().notifyStateChange(state);
@@ -166,24 +171,35 @@
setState(ClusterState.UNUSABLE);
return;
}
-
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
setState(ClusterState.UNUSABLE);
return;
}
}
-
- // if all storage partitions are active as well as the metadata node, then the cluster is active
+ IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+ for (String node : activeNcConfiguration.keySet()) {
+ if (!resourceIdManager.reported(node)) {
+ LOGGER.log(Level.INFO, "Partitions are ready but %s has not yet registered its max resource id...",
+ node);
+ setState(ClusterState.UNUSABLE);
+ return;
+ }
+ }
+ // the metadata bootstrap & global recovery must be complete before the cluster can be active
if (metadataNodeActive) {
- if (state != ClusterState.ACTIVE) {
+ if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
setState(ClusterState.PENDING);
}
appCtx.getMetadataBootstrap().init();
- setState(ClusterState.ACTIVE);
- notifyAll();
- // start global recovery
- appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+
+ if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+ setState(ClusterState.ACTIVE);
+ } else {
+ // start global recovery
+ setState(ClusterState.RECOVERING);
+ appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+ }
} else {
setState(ClusterState.PENDING);
}
@@ -269,8 +285,8 @@
clusterActiveLocations.add(p.getActiveNodeId());
}
}
- clusterPartitionConstraint =
- new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {}));
+ clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ clusterActiveLocations.toArray(new String[] {}));
}
@Override
@@ -432,8 +448,8 @@
}
private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) {
- ConfigManager configManager =
- ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager();
+ ConfigManager configManager = ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig())
+ .getConfigManager();
for (Map.Entry<IOption, Object> entry : configuration.entrySet()) {
if (entry.getKey().section() == Section.NC) {
configManager.set(nodeId, entry.getKey(), entry.getValue());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
index 3ce314f..1d22f85 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
@@ -21,12 +21,15 @@
import org.apache.hyracks.api.config.IConfigManager;
import org.kohsuke.args4j.OptionHandlerFilter;
+@SuppressWarnings("squid:S00112") // define and throw specific class of Exception
public interface IApplication {
- void start(IServiceContext ctx, String[] args) throws Exception; //NOSONAR
+ void init(IServiceContext serviceCtx) throws Exception;
- void startupCompleted() throws Exception; //NOSONAR
+ void start(String[] args) throws Exception;
- void stop() throws Exception; //NOSONAR
+ void startupCompleted() throws Exception;
+
+ void stop() throws Exception;
void registerConfig(IConfigManager configManager);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index b94cf01..5ea51d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -40,7 +40,12 @@
}
@Override
- public void start(IServiceContext serviceCtx, String[] args) throws Exception {
+ public void init(IServiceContext serviceCtx) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void start(String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a243bf8..dfc79ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -198,9 +198,9 @@
clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
- clientIPC =
- new IPCSystem(new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
- ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+ clientIPC = new IPCSystem(
+ new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()), ciIPCI,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this, ccConfig.getConsoleListenPort());
clusterIPC.start();
clientIPC.start();
@@ -221,15 +221,16 @@
private void startApplication() throws Exception {
serviceCtx = new CCServiceContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
serviceCtx.addJobLifecycleListener(datasetDirectoryService);
+ application.init(serviceCtx);
executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
- application.start(serviceCtx, ccConfig.getAppArgsArray());
+ application.start(ccConfig.getAppArgsArray());
IJobCapacityController jobCapacityController = application.getJobCapacityController();
// Job manager is in charge of job lifecycle management.
try {
- Constructor<?> jobManagerConstructor =
- this.getClass().getClassLoader().loadClass(ccConfig.getJobManagerClass()).getConstructor(
- CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
+ Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
+ .loadClass(ccConfig.getJobManagerClass())
+ .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
@@ -406,8 +407,8 @@
@Override
public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException {
- GetIpAddressNodeNameMapWork ginmw =
- new GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(), map);
+ GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(
+ ClusterControllerService.this.getNodeManager(), map);
try {
workQueue.scheduleAndSync(ginmw);
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 5afc98d..4d8cbbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -41,7 +41,12 @@
}
@Override
- public void start(IServiceContext ncAppCtx, String[] args) throws Exception {
+ public void init(IServiceContext serviceCtx) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void start(String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b52675c..ed5598b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -194,8 +194,8 @@
// Set shutdown hook before so it doesn't have the same uncaught exception handler
Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this));
Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
- ioManager =
- new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
+ ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
+ application.getFileDeviceResolver());
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
@@ -336,8 +336,8 @@
// Use "public" versions of network addresses and ports
NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
- NetworkAddress meesagingPort =
- messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
+ NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
+ : null;
int allCores = osMXBean.getAvailableProcessors();
nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(),
@@ -365,8 +365,9 @@
private void startApplication() throws Exception {
serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig());
- application.start(serviceCtx, ncConfig.getAppArgsArray());
+ application.init(serviceCtx);
executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+ application.start(ncConfig.getAppArgsArray());
}
public void updateMaxJobId(JobId jobId) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index dc0bf0c..d659fe6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.control.nc.application;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.Serializable;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -54,13 +53,7 @@
this.ioManager = ioManager;
this.memoryManager = memoryManager;
this.ncs = ncs;
- sdh = new IStateDumpHandler() {
-
- @Override
- public void dumpState(OutputStream os) throws IOException {
- lccm.dumpState(os);
- }
- };
+ sdh = lccm::dumpState;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index 2967039..780a65c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -30,11 +30,16 @@
private RuntimeContext rCtx;
@Override
- public void start(IServiceContext serviceCtx, String[] args) throws Exception {
+ public void init(IServiceContext serviceCtx) throws Exception {
rCtx = new RuntimeContext((INCServiceContext) serviceCtx);
}
@Override
+ public void start(String[] args) throws Exception {
+ // No-op
+ }
+
+ @Override
public void startupCompleted() throws Exception {
// No-op
}