Unify structure of Cluster Controller and Node Controller
Change-Id: Ife3a002371a07ee9cdd32e1ffd50cc775bf1d453
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1514
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index a816d19..6c4cb15 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -36,10 +36,6 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-control-nc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index b49ff72..12f28c5 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -36,8 +36,8 @@
public ActiveSourceOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId) {
this.ctx = ctx;
- activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject()).getActiveManager();
+ activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
+ .getApplicationContext()).getActiveManager();
this.runtimeId = runtimeId;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 241c932..da3ac1c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -25,7 +25,6 @@
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class ActiveManagerMessage implements IApplicationMessage {
public static final byte STOP_ACTIVITY = 0x00;
@@ -55,9 +54,7 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
((ActiveManager) appContext.getActiveManager()).submit(this);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index fbb2208..f3c9744 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -33,12 +33,12 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.PropertiesAccessor;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
-import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.hyracks.bootstrap.NCApplication;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
-import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.application.ICCApplication;
+import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobFlag;
@@ -70,11 +70,11 @@
public void init(boolean deleteOldInstanceData) throws Exception {
ncs = new NodeControllerService[0]; // ensure that ncs is not null
- final ICCApplicationEntryPoint ccAppEntryPoint = createCCAppEntryPoint();
+ final ICCApplication ccApplication = createCCApplication();
configManager = new ConfigManager();
- ccAppEntryPoint.registerConfig(configManager);
+ ccApplication.registerConfig(configManager);
final CCConfig ccConfig = createCCConfig(configManager);
- cc = new ClusterControllerService(ccConfig, ccAppEntryPoint);
+ cc = new ClusterControllerService(ccConfig, ccApplication);
nodeNames = ccConfig.getConfigManager().getNodeNames();
if (deleteOldInstanceData) {
@@ -93,8 +93,8 @@
List<NodeControllerService> nodeControllers = new ArrayList<>();
List<Thread> startupThreads = new ArrayList<>();
for (NCConfig ncConfig : ncConfigs) {
- final INCApplicationEntryPoint ncAppEntryPoint = createNCAppEntryPoint();
- NodeControllerService nodeControllerService = new NodeControllerService(ncConfig, ncAppEntryPoint);
+ final INCApplication ncApplication = createNCApplication();
+ NodeControllerService nodeControllerService = new NodeControllerService(ncConfig, ncApplication);
nodeControllers.add(nodeControllerService);
Thread ncStartThread = new Thread("IntegrationUtil-" + ncConfig.getNodeId()) {
@Override
@@ -141,8 +141,8 @@
return ccConfig;
}
- protected ICCApplicationEntryPoint createCCAppEntryPoint() {
- return new CCApplicationEntryPoint();
+ protected ICCApplication createCCApplication() {
+ return new CCApplication();
}
protected NCConfig createNCConfig(String ncName, ConfigManager configManager) {
@@ -159,8 +159,8 @@
return ncConfig;
}
- protected INCApplicationEntryPoint createNCAppEntryPoint() {
- return new NCApplicationEntryPoint();
+ protected INCApplication createNCApplication() {
+ return new NCApplication();
}
private NCConfig fixupIODevices(NCConfig ncConfig, PropertiesAccessor accessor) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index c2b44c9..6b16be1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -98,7 +98,7 @@
protected ObjectNode getClusterStateJSON(IServletRequest request, String pathToNode) {
ObjectNode json = ClusterStateManager.INSTANCE.getClusterStateDescription();
AppContextInfo appConfig = (AppContextInfo) ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR);
- json.putPOJO("config", ConfigUtils.getSectionOptionsForJSON(appConfig.getCCApplicationContext().getAppConfig(),
+ json.putPOJO("config", ConfigUtils.getSectionOptionsForJSON(appConfig.getCCServiceContext().getAppConfig(),
Section.COMMON, getConfigSelector()));
ArrayNode ncs = (ArrayNode) json.get("ncs");
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index aacad8e..55ea971 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -75,7 +75,7 @@
import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -103,7 +103,7 @@
private static final Logger LOGGER = Logger.getLogger(NCAppRuntimeContext.class.getName());
private ILSMMergePolicyFactory metadataMergePolicyFactory;
- private final INCApplicationContext ncApplicationContext;
+ private final INCServiceContext ncServiceContext;
private final IResourceIdFactory resourceIdFactory;
private CompilerProperties compilerProperties;
private ExternalProperties externalProperties;
@@ -137,12 +137,12 @@
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
- public NCAppRuntimeContext(INCApplicationContext ncApplicationContext, List<AsterixExtension> extensions)
+ public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
IOException {
List<AsterixExtension> allExtensions = new ArrayList<>();
- this.ncApplicationContext = ncApplicationContext;
- PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
+ this.ncServiceContext = ncServiceContext;
+ PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ncServiceContext.getAppConfig());
compilerProperties = new CompilerProperties(propertiesAccessor);
externalProperties = new ExternalProperties(propertiesAccessor);
metadataProperties = new MetadataProperties(propertiesAccessor);
@@ -160,7 +160,7 @@
allExtensions.addAll(new ExtensionProperties(propertiesAccessor).getExtensions());
ncExtensionManager = new NCExtensionManager(allExtensions);
componentProvider = new StorageComponentProvider();
- resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
+ resourceIdFactory = new GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory();
}
@Override
@@ -168,28 +168,28 @@
Logger.getLogger("org.apache.asterix").setLevel(externalProperties.getLogLevel());
Logger.getLogger("org.apache.hyracks").setLevel(externalProperties.getLogLevel());
- ioManager = ncApplicationContext.getIoManager();
- threadExecutor = new ThreadExecutor(ncApplicationContext.getThreadFactory());
+ ioManager = ncServiceContext.getIoManager();
+ threadExecutor = new ThreadExecutor(ncServiceContext.getThreadFactory());
fileMapManager = new FileMapManager(ioManager);
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
- AsynchronousScheduler.INSTANCE.init(ncApplicationContext.getThreadFactory());
+ AsynchronousScheduler.INSTANCE.init(ncServiceContext.getThreadFactory());
lsmIOScheduler = AsynchronousScheduler.INSTANCE;
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
- new PersistentLocalResourceRepositoryFactory(ioManager, ncApplicationContext.getNodeId(),
+ new PersistentLocalResourceRepositoryFactory(ioManager, ncServiceContext.getNodeId(),
metadataProperties);
localResourceRepository =
(PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AppRuntimeContextProviderForRecovery(this);
- txnSubsystem = new TransactionSubsystem(ncApplicationContext, ncApplicationContext.getNodeId(),
+ txnSubsystem = new TransactionSubsystem(ncServiceContext, ncServiceContext.getNodeId(),
asterixAppRuntimeContextProvider, txnProperties);
IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
@@ -205,11 +205,11 @@
isShuttingdown = false;
- activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(),
+ activeManager = new ActiveManager(threadExecutor, ncServiceContext.getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
- if (replicationProperties.isParticipant(ncApplicationContext.getNodeId())) {
- String nodeId = ncApplicationContext.getNodeId();
+ if (replicationProperties.isParticipant(ncServiceContext.getNodeId())) {
+ String nodeId = ncServiceContext.getNodeId();
replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
@@ -238,24 +238,24 @@
//initialize replication channel
replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(),
- replicaResourcesManager, replicationManager, ncApplicationContext,
+ replicaResourcesManager, replicationManager, ncServiceContext,
asterixAppRuntimeContextProvider);
remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties);
bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
- storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory(),
+ storageProperties.getBufferCacheMaxOpenFiles(), ncServiceContext.getThreadFactory(),
replicationManager);
} else {
bufferCache = new BufferCache(ioManager, prs, pcp, fileMapManager,
- storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
+ storageProperties.getBufferCacheMaxOpenFiles(), ncServiceContext.getThreadFactory());
}
/*
* The order of registration is important. The buffer cache must registered before recovery and transaction
* managers. Notes: registered components are stopped in reversed order
*/
- ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
+ ILifeCycleComponentManager lccm = ncServiceContext.getLifeCycleComponentManager();
lccm.register((ILifeCycleComponent) bufferCache);
/*
* LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager
@@ -442,7 +442,7 @@
MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(),
ncExtensionManager.getMetadataExtensions());
- proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
+ proxy = (IAsterixStateProxy) ncServiceContext.getDistributedState();
if (proxy == null) {
throw new IllegalStateException("Metadata node cannot access distributed state");
}
@@ -451,9 +451,9 @@
// This way we can delay the registration of the metadataNode until
// it is completely initialized.
MetadataManager.initialize(proxy, MetadataNode.INSTANCE);
- MetadataBootstrap.startUniverse(ncApplicationContext, newUniverse);
+ MetadataBootstrap.startUniverse(ncServiceContext, newUniverse);
MetadataBootstrap.startDDLRecovery();
- ncExtensionManager.initializeMetadata(ncApplicationContext);
+ ncExtensionManager.initializeMetadata(ncServiceContext);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Metadata node bound");
@@ -464,7 +464,7 @@
public void exportMetadataNodeStub() throws RemoteException {
IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
getMetadataProperties().getMetadataPort());
- ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
+ ((IAsterixStateProxy) ncServiceContext.getDistributedState()).setMetadataNode(stub);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java
index 9937479..6ea97d3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java
@@ -29,7 +29,7 @@
import org.apache.asterix.metadata.api.INCExtensionManager;
import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
import org.apache.asterix.utils.ExtensionUtil;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -91,17 +91,17 @@
/**
* Called on bootstrap of metadata node allowing extensions to instantiate their Metadata artifacts
*
- * @param ncApplicationContext
- * the node controller application context
+ * @param ncServiceCtx
+ * the node controller service context
* @throws HyracksDataException
*/
- public void initializeMetadata(INCApplicationContext appCtx) throws HyracksDataException {
+ public void initializeMetadata(INCServiceContext ncServiceCtx) throws HyracksDataException {
if (mdExtensions != null) {
for (IMetadataExtension mdExtension : mdExtensions) {
try {
- mdExtension.initializeMetadata(appCtx);
+ mdExtension.initializeMetadata(ncServiceCtx);
} catch (RemoteException | ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4ee1122..6e4e4cb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -43,7 +43,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -64,7 +63,7 @@
import org.apache.asterix.transaction.management.service.recovery.TxnId;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.api.IIndex;
@@ -92,14 +91,14 @@
private final PersistentLocalResourceRepository localResourceRepository;
private final ICheckpointManager checkpointManager;
private SystemState state;
- private final INCApplicationContext appCtx;
+ private final INCServiceContext serviceCtx;
- public RecoveryManager(ITransactionSubsystem txnSubsystem, INCApplicationContext appCtx) {
- this.appCtx = appCtx;
+ public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) {
+ this.serviceCtx = serviceCtx;
this.txnSubsystem = txnSubsystem;
logMgr = (LogManager) txnSubsystem.getLogManager();
- ReplicationProperties repProperties = ((IPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getAppContext()).getReplicationProperties();
+ ReplicationProperties repProperties =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationProperties();
replicationEnabled = repProperties.isParticipant(txnSubsystem.getId());
localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
.getLocalResourceRepository();
@@ -374,7 +373,7 @@
index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
if (index == null) {
//#. create index instance and register to indexLifeCycleManager
- index = localResourceMetadata.createIndexInstance(appCtx, localResource);
+ index = localResourceMetadata.createIndexInstance(serviceCtx, localResource);
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 8e9463b..88311c5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -42,7 +42,7 @@
import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
/**
* Provider for all the sub-systems (transaction/lock/log/recovery) managers.
@@ -62,7 +62,7 @@
private long profilerEntityCommitLogCount = 0;
private EntityCommitProfiler ecp;
- public TransactionSubsystem(INCApplicationContext appCtx, String id,
+ public TransactionSubsystem(INCServiceContext serviceCtx, String id,
IAppRuntimeContextProvider asterixAppRuntimeContextProvider, TransactionProperties txnProperties)
throws ACIDException {
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
@@ -89,7 +89,7 @@
} else {
this.logManager = new LogManager(this);
}
- this.recoveryManager = new RecoveryManager(this, appCtx);
+ this.recoveryManager = new RecoveryManager(this, serviceCtx);
if (TransactionUtil.PROFILE_MODE) {
ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
index 99f641b..91f3524 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -22,7 +22,6 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class BindMetadataNodeTask implements INCLifecycleTask {
@@ -35,13 +34,12 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
try {
if (exportStub) {
- runtimeContext.exportMetadataNodeStub();
+ appContext.exportMetadataNodeStub();
} else {
- runtimeContext.unexportMetadataNodeStub();
+ appContext.unexportMetadataNodeStub();
}
} catch (Exception e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
index 8ef6ae3..e77346a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
@@ -23,7 +23,6 @@
import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class CheckpointTask implements INCLifecycleTask {
@@ -31,9 +30,8 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
+ ICheckpointManager checkpointMgr = appContext.getTransactionSubsystem().getCheckpointManager();
checkpointMgr.doSharpCheckpoint();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
index 8604364..ad9b28a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -23,7 +23,6 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class ExternalLibrarySetupTask implements INCLifecycleTask {
@@ -36,8 +35,7 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
try {
ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
index bca39b0..777097d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class LocalRecoveryTask implements INCLifecycleTask {
@@ -39,10 +38,9 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
try {
- runtimeContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
+ appContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
} catch (IOException | ACIDException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index f7c33a4..65004b8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -23,7 +23,6 @@
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class MetadataBootstrapTask implements INCLifecycleTask {
@@ -31,8 +30,7 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
try {
SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
index 4574304..48479c5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class RemoteRecoveryTask implements INCLifecycleTask {
@@ -39,9 +38,8 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- runtimeContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan);
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
+ appContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan);
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
index 6ae4487..9f04b10 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
@@ -22,7 +22,6 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class StartFailbackTask implements INCLifecycleTask {
@@ -30,8 +29,7 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- runtimeContext.getRemoteRecoveryManager().startFailbackProcess();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
+ appContext.getRemoteRecoveryManager().startFailbackProcess();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
index 3beb573..d1754dd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
@@ -25,15 +25,13 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.hyracks.control.nc.application.NCApplicationContext;
+import org.apache.hyracks.control.nc.application.NCServiceContext;
public class StartLifecycleComponentsTask implements INCLifecycleTask {
@@ -42,26 +40,25 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- NCApplicationContext appContext = ncs.getApplicationContext();
- MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
+ IAppRuntimeContext applicationContext = (IAppRuntimeContext) cs.getApplicationContext();
+ NCServiceContext serviceCtx = (NCServiceContext) cs.getContext();
+ MetadataProperties metadataProperties = applicationContext.getMetadataProperties();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting lifecycle components");
}
Map<String, String> lifecycleMgmtConfiguration = new HashMap<>();
String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
- String dumpPath = metadataProperties.getCoredumpPath(appContext.getNodeId());
+ String dumpPath = metadataProperties.getCoredumpPath(serviceCtx.getNodeId());
lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Coredump directory for NC is: " + dumpPath);
}
- ILifeCycleComponentManager lccm = appContext.getLifeCycleComponentManager();
+ ILifeCycleComponentManager lccm = serviceCtx.getLifeCycleComponentManager();
lccm.configure(lifecycleMgmtConfiguration);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Configured:" + lccm);
}
- appContext.setStateDumpHandler(new AsterixStateDumpHandler(appContext.getNodeId(), lccm.getDumpPath(), lccm));
+ serviceCtx.setStateDumpHandler(new AsterixStateDumpHandler(serviceCtx.getNodeId(), lccm.getDumpPath(), lccm));
lccm.startAll();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
index 17fde86..93e5b50 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -23,7 +23,6 @@
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class StartReplicationServiceTask implements INCLifecycleTask {
@@ -31,12 +30,11 @@
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
try {
//Open replication channel
- runtimeContext.getReplicationChannel().start();
- final IReplicationManager replicationManager = runtimeContext.getReplicationManager();
+ appContext.getReplicationChannel().start();
+ final IReplicationManager replicationManager = appContext.getReplicationManager();
//Check the state of remote replicas
replicationManager.initializeReplicasState();
//Start replication after the state of remote replicas has been initialized.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
index 16a800b..0924838 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
@@ -29,7 +29,6 @@
import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
@@ -64,10 +63,8 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
+ INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker();
HyracksDataException hde = null;
try {
IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
index 8188c44..abfd6b2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
@@ -29,7 +29,6 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
@@ -73,10 +72,8 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
+ INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker();
/**
* if the metadata partition will be failed back
* we need to flush and close all datasets including metadata datasets
@@ -100,15 +97,15 @@
}
//mark the partitions to be closed as inactive
- PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
- .getLocalResourceRepository();
+ PersistentLocalResourceRepository localResourceRepo =
+ (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
for (Integer partitionId : partitions) {
localResourceRepo.addInactivePartition(partitionId);
}
//send response after partitions prepared for failback
- PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId,
- requestId, partitions);
+ PreparePartitionsFailbackResponseMessage reponse =
+ new PreparePartitionsFailbackResponseMessage(planId, requestId, partitions);
try {
broker.sendMessageToCC(reponse);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
index e0bc49d..5a73543 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
@@ -42,11 +42,11 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext();
// Replay the logs for these partitions and flush any impacted dataset
appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, true);
- INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ INCMessageBroker broker = (INCMessageBroker) ncs.getContext().getMessageBroker();
ReplayPartitionLogsResponseMessage reponse = new ReplayPartitionLogsResponseMessage(ncs.getId(), partitions);
try {
broker.sendMessageToCC(reponse);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
index 472a89c..6a313f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
@@ -44,7 +44,7 @@
public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException {
try {
StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState);
- ((INCMessageBroker) cs.getApplicationContext().getMessageBroker()).sendMessageToCC(msg);
+ ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
index 922ac89..92abf5b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class StartupTaskResponseMessage implements INCLifecycleMessage {
@@ -43,8 +42,7 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker();
boolean success = true;
HyracksDataException exception = null;
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
index 3be3eab..fbc0a4d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage {
@@ -35,10 +34,8 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
+ INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker();
HyracksDataException hde = null;
try {
appContext.initializeMetadata(false);
@@ -47,8 +44,8 @@
LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
hde = HyracksDataException.create(e);
} finally {
- TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
- appContext.getTransactionSubsystem().getId());
+ TakeoverMetadataNodeResponseMessage reponse =
+ new TakeoverMetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId());
try {
broker.sendMessageToCC(reponse);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
index 7f9cc2b..09bb051 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
@@ -29,7 +29,6 @@
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage {
@@ -74,9 +73,8 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
+ INCMessageBroker broker = (INCMessageBroker) cs.getContext().getMessageBroker();
//if the NC is shutting down, it should ignore takeover partitions request
if (!appContext.isShuttingdown()) {
HyracksDataException hde = null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ApplicationEntryPointHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ApplicationClassHelper.java
similarity index 86%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ApplicationEntryPointHelper.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ApplicationClassHelper.java
index 8c3861a..e276be1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ApplicationEntryPointHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ApplicationClassHelper.java
@@ -25,15 +25,15 @@
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.util.file.FileUtil;
-class ApplicationEntryPointHelper {
- private ApplicationEntryPointHelper() {
+class ApplicationClassHelper {
+ private ApplicationClassHelper() {
}
static void registerConfigOptions(IConfigManager configManager) {
AsterixProperties.registerConfigOptions(configManager);
ControllerConfig.defaultDir = FileUtil.joinPath(System.getProperty("java.io.tmpdir"), "asterixdb");
- NCConfig.defaultAppClass = NCApplicationEntryPoint.class.getName();
- CCConfig.defaultAppClass = CCApplicationEntryPoint.class.getName();
+ NCConfig.defaultAppClass = NCApplication.class.getName();
+ CCConfig.defaultAppClass = CCApplication.class.getName();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
similarity index 87%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index b63bb31..dadc2ae 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -71,42 +71,45 @@
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
-public class CCApplicationEntryPoint extends org.apache.hyracks.control.cc.CCApplicationEntryPoint {
+public class CCApplication extends BaseCCApplication {
- private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(CCApplication.class.getName());
private static IAsterixStateProxy proxy;
- protected ICCApplicationContext appCtx;
+ protected ICCServiceContext ccServiceCtx;
protected CCExtensionManager ccExtensionManager;
protected IStorageComponentProvider componentProvider;
private IJobCapacityController jobCapacityController;
protected WebManager webManager;
@Override
- public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+ public void start(IServiceContext serviceCtx, String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
- final ClusterControllerService controllerService = (ClusterControllerService) ccAppCtx.getControllerService();
+ final ClusterControllerService controllerService = (ClusterControllerService) serviceCtx.getControllerService();
ICCMessageBroker messageBroker = new CCMessageBroker(controllerService);
- this.appCtx = ccAppCtx;
+ this.ccServiceCtx = (ICCServiceContext) serviceCtx;
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix cluster controller");
}
- appCtx.setThreadFactory(new AsterixThreadFactory(appCtx.getThreadFactory(), new LifeCycleComponentManager()));
+ ccServiceCtx.setThreadFactory(
+ new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
ILibraryManager libraryManager = new ExternalLibraryManager();
ResourceIdManager resourceIdManager = new ResourceIdManager();
IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
@@ -114,8 +117,8 @@
.create(ClusterProperties.INSTANCE.getCluster(), repStrategy, messageBroker);
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
componentProvider = new StorageComponentProvider();
- GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection(), componentProvider);
- AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), libraryManager, resourceIdManager,
+ GlobalRecoveryManager.instantiate((HyracksConnection) getHcc(), componentProvider);
+ AppContextInfo.initialize(ccServiceCtx, getHcc(), libraryManager, resourceIdManager,
() -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy);
ccExtensionManager = new CCExtensionManager(getExtensions());
AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
@@ -126,19 +129,19 @@
MetadataProperties metadataProperties = AppContextInfo.INSTANCE.getMetadataProperties();
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort()));
- appCtx.setDistributedState(proxy);
+ ccServiceCtx.setDistributedState(proxy);
MetadataManager.initialize(proxy, metadataProperties);
- AppContextInfo.INSTANCE.getCCApplicationContext().addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
+ AppContextInfo.INSTANCE.getCCServiceContext().addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
// create event loop groups
webManager = new WebManager();
configureServers();
webManager.start();
ClusterManagerProvider.getClusterManager().registerSubscriber(GlobalRecoveryManager.instance());
- ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
- ccAppCtx.setMessageBroker(messageBroker);
+ ccServiceCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
+ ccServiceCtx.setMessageBroker(messageBroker);
jobCapacityController = new JobCapacityController(controllerService.getResourceManager());
}
@@ -164,16 +167,10 @@
webManager.stop();
}
- protected IHyracksClientConnection getNewHyracksClientConnection() throws Exception {
- String strIP = appCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
- int port = appCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
- return new HyracksConnection(strIP, port);
- }
-
protected HttpServer setupWebServer(ExternalProperties externalProperties) throws Exception {
HttpServer webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
externalProperties.getWebInterfacePort());
- IHyracksClientConnection hcc = getNewHyracksClientConnection();
+ IHyracksClientConnection hcc = getHcc();
webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" },
ccExtensionManager.getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(),
@@ -184,11 +181,11 @@
protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception {
HttpServer jsonAPIServer =
new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort());
- IHyracksClientConnection hcc = getNewHyracksClientConnection();
+ IHyracksClientConnection hcc = getHcc();
jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, AppContextInfo.INSTANCE);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE,
- ((ClusterControllerService) appCtx.getControllerService()).getExecutor());
+ ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
// AQL rest APIs.
addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -223,7 +220,7 @@
protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception {
HttpServer queryWebServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
externalProperties.getQueryWebInterfacePort());
- IHyracksClientConnection hcc = getNewHyracksClientConnection();
+ IHyracksClientConnection hcc = getHcc();
queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
queryWebServer.addServlet(new QueryWebInterfaceServlet(queryWebServer.ctx(), new String[] { "/*" }));
return queryWebServer;
@@ -232,7 +229,7 @@
protected HttpServer setupFeedServer(ExternalProperties externalProperties) throws Exception {
HttpServer feedServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
externalProperties.getActiveServerPort());
- feedServer.setAttribute(HYRACKS_CONNECTION_ATTR, getNewHyracksClientConnection());
+ feedServer.setAttribute(HYRACKS_CONNECTION_ATTR, getHcc());
feedServer.addServlet(new FeedServlet(feedServer.ctx(), new String[] { "/" }));
return feedServer;
}
@@ -291,7 +288,7 @@
private IStatementExecutorFactory getStatementExecutorFactory() {
return ccExtensionManager.getStatementExecutorFactory(
- ((ClusterControllerService) appCtx.getControllerService()).getExecutorService());
+ ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutorService());
}
@Override
@@ -307,10 +304,22 @@
@Override
public void registerConfig(IConfigManager configManager) {
super.registerConfig(configManager);
- ApplicationEntryPointHelper.registerConfigOptions(configManager);
+ ApplicationClassHelper.registerConfigOptions(configManager);
}
public static synchronized void setAsterixStateProxy(IAsterixStateProxy proxy) {
- org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint.proxy = proxy;
+ CCApplication.proxy = proxy;
+ }
+
+ @Override
+ public AppContextInfo getApplicationContext() {
+ return AppContextInfo.INSTANCE;
+ }
+
+ @Override
+ public IHyracksClientConnection getHcc() throws Exception {
+ String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
+ int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+ return new HyracksConnection(strIP, port);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
similarity index 81%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 07a9a61..28e1f23 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -47,63 +47,64 @@
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class NCApplicationEntryPoint extends org.apache.hyracks.control.nc.NCApplicationEntryPoint {
- private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
+public class NCApplication extends BaseNCApplication {
+ private static final Logger LOGGER = Logger.getLogger(NCApplication.class.getName());
- private INCApplicationContext ncAppCtx;
+ private INCServiceContext ncServiceCtx;
private IAppRuntimeContext runtimeContext;
private String nodeId;
private boolean stopInitiated = false;
private SystemState systemState;
@Override
- public void registerConfigOptions(IConfigManager configManager) {
- super.registerConfigOptions(configManager);
- ApplicationEntryPointHelper.registerConfigOptions(configManager);
+ public void registerConfig(IConfigManager configManager) {
+ super.registerConfig(configManager);
+ ApplicationClassHelper.registerConfigOptions(configManager);
}
@Override
- public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ public void start(IServiceContext serviceCtx, String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
- ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getThreadFactory(),
- ncAppCtx.getLifeCycleComponentManager()));
- this.ncAppCtx = ncAppCtx;
- nodeId = this.ncAppCtx.getNodeId();
+ this.ncServiceCtx = (INCServiceContext) serviceCtx;
+ ncServiceCtx.setThreadFactory(
+ new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager()));
+ nodeId = this.ncServiceCtx.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix node controller: " + nodeId);
}
- final NodeControllerService controllerService = (NodeControllerService) ncAppCtx.getControllerService();
+ final NodeControllerService controllerService = (NodeControllerService) ncServiceCtx.getControllerService();
if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname", (controllerService)
- .getConfiguration().getClusterPublicAddress());
+ System.setProperty("java.rmi.server.hostname",
+ (controllerService).getConfiguration().getClusterPublicAddress());
}
- runtimeContext = new NCAppRuntimeContext(this.ncAppCtx, getExtensions());
+ runtimeContext = new NCAppRuntimeContext(this.ncServiceCtx, getExtensions());
MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
- if (!metadataProperties.getNodeNames().contains(this.ncAppCtx.getNodeId())) {
+ if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Substitute node joining : " + this.ncAppCtx.getNodeId());
+ LOGGER.info("Substitute node joining : " + this.ncServiceCtx.getNodeId());
}
updateOnNodeJoin();
}
runtimeContext.initialize(runtimeContext.getNodeProperties().isInitialRun());
- this.ncAppCtx.setApplicationObject(runtimeContext);
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
- this.ncAppCtx.setMessageBroker(messageBroker);
- MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
- (NCMessageBroker) messageBroker, messagingProperties);
- this.ncAppCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
+ this.ncServiceCtx.setMessageBroker(messageBroker);
+ MessagingChannelInterfaceFactory interfaceFactory =
+ new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
+ this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
systemState = recoveryMgr.getSystemState();
@@ -139,7 +140,7 @@
performLocalCleanUp();
//Note: stopping recovery manager will make a sharp checkpoint
- ncAppCtx.getLifeCycleComponentManager().stopAll(false);
+ ncServiceCtx.getLifeCycleComponentManager().stopAll(false);
runtimeContext.deinitialize();
} else {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -149,15 +150,15 @@
}
@Override
- public void notifyStartupComplete() throws Exception {
+ public void startupCompleted() throws Exception {
// Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
- if (systemState == SystemState.PERMANENT_DATA_LOSS && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
+ if (systemState == SystemState.PERMANENT_DATA_LOSS
+ && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
systemState = SystemState.BOOTSTRAPPING;
}
// Request startup tasks from CC
- StartupTaskRequestMessage.send((NodeControllerService) ncAppCtx.getControllerService(),
- systemState);
+ StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
}
@Override
@@ -182,8 +183,8 @@
String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
.getStorageMountingPoints();
for (String ioDevice : ioDevices) {
- String tempDatasetsDir = ioDevice + storageDirName + File.separator
- + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+ String tempDatasetsDir =
+ ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
FileUtils.deleteQuietly(new File(tempDatasetsDir));
}
@@ -199,7 +200,7 @@
if (cluster == null) {
throw new IllegalStateException("No cluster configuration found for this instance");
}
- NCConfig ncConfig = ((NodeControllerService) ncAppCtx.getControllerService()).getConfiguration();
+ NCConfig ncConfig = ((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration();
ncConfig.getConfigManager().registerVirtualNode(nodeId);
String asterixInstanceName = metadataProperties.getInstanceName();
TransactionProperties txnProperties = runtimeContext.getTransactionProperties();
@@ -245,4 +246,9 @@
}
}
}
+
+ @Override
+ public IAppRuntimeContext getApplicationContext() {
+ return runtimeContext;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 10193b3..f615138 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -46,7 +46,7 @@
public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) {
this.ncs = ncs;
- appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ appContext = (IAppRuntimeContext) ncs.getApplicationContext();
maxMsgSize = messagingProperties.getFrameSize();
int messagingMemoryBudget = messagingProperties.getFrameSize() * messagingProperties.getFrameCount();
messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index f8b5496..4ea524a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -391,7 +391,7 @@
private static void SendActiveMessage(ActiveManagerMessage activeManagerMessage, String nodeId) throws Exception {
ICCMessageBroker messageBroker =
- (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+ (ICCMessageBroker) AppContextInfo.INSTANCE.getCCServiceContext().getMessageBroker();
messageBroker.sendApplicationMessageToNC(activeManagerMessage, nodeId);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 2061cda..b7026ef 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -141,8 +141,7 @@
throw th;
}
jobletCtx = Mockito.mock(IHyracksJobletContext.class);
- Mockito.when(jobletCtx.getApplicationContext())
- .thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext());
+ Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
@Override
public JobId answer(InvocationOnMock invocation) throws Throwable {
@@ -389,7 +388,7 @@
public TransactionSubsystem getTransactionSubsystem() {
return (TransactionSubsystem) ((NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0]
- .getApplicationContext().getApplicationObject()).getTransactionSubsystem();
+ .getApplicationContext()).getTransactionSubsystem();
}
public ITransactionManager getTransactionManager() {
@@ -397,8 +396,7 @@
}
public NCAppRuntimeContext getAppRuntimeContext() {
- return (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext()
- .getApplicationObject();
+ return (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext();
}
public DatasetLifecycleManager getDatasetLifecycleManager() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/config/ConfigUsageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/config/ConfigUsageTest.java
index 9cd961a..6a8dabf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/config/ConfigUsageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/config/ConfigUsageTest.java
@@ -31,7 +31,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.config.IOption;
@@ -83,8 +83,8 @@
protected ConfigManager getConfigManager() {
ConfigManager configManager = new ConfigManager();
- CCApplicationEntryPoint aep = new CCApplicationEntryPoint();
- aep.registerConfig(configManager);
+ CCApplication application = new CCApplication();
+ application.registerConfig(configManager);
ControllerConfig.defaultDir = ControllerConfig.defaultDir.replace(System.getProperty("java.io.tmpdir"),
"${java.io.tmpdir}/");
return configManager;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 90eb441..8714319 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -92,8 +92,7 @@
libraryManagers.add(AppContextInfo.INSTANCE.getLibraryManager());
// Adds library managers for NCs, one-per-NC.
for (NodeControllerService nc : integrationUtil.ncs) {
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) nc.getApplicationContext()
- .getApplicationObject();
+ IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) nc.getApplicationContext();
libraryManagers.add(runtimeCtx.getLibraryManager());
}
return libraryManagers;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
index af972b6..7f3f6aa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
@@ -56,7 +56,7 @@
deviceId = i;
}
}
- return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ return ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getDatasetLifecycleManager().getVirtualBufferCaches(datasetID, deviceId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index b2bcf17..d9381a2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -37,14 +37,14 @@
private static final String[] SET_VALUES = new String[] { "max-mergable-component-size",
"max-tolerance-component-count" };
- private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+ private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES));
private int datasetID;
@Override
public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IHyracksTaskContext ctx) {
IDatasetLifecycleManager dslcManager = ((IAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
+ .getServiceContext().getApplicationContext()).getDatasetLifecycleManager();
ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetID);
policy.configure(properties);
return policy;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java
index cfbad43..5dab970 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/TransactionSubsystemProvider.java
@@ -37,8 +37,8 @@
@Override
public ITransactionSubsystem getTransactionSubsystem(IHyracksTaskContext ctx) {
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- return runtimeCtx.getTransactionSubsystem();
+ IAppRuntimeContext appCtx =
+ (IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ return appCtx.getTransactionSubsystem();
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IApplicationContextInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IApplicationContextInfo.java
index eb944a2..3c5328d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IApplicationContextInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IApplicationContextInfo.java
@@ -20,7 +20,7 @@
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.common.IStorageManager;
@@ -28,7 +28,7 @@
* Provides methods for obtaining
* {@link org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider},
* {@link org.apache.hyracks.storage.common.IStorageManager},
- * {@link org.apache.hyracks.api.application.ICCApplicationContext},
+ * {@link org.apache.hyracks.api.application.ICCServiceContext},
* {@link org.apache.asterix.common.cluster.IGlobalRecoveryManager},
* and {@link org.apache.asterix.common.library.ILibraryManager}
* at the cluster controller side.
@@ -48,9 +48,9 @@
public IStorageManager getStorageManager();
/**
- * @return an instance which implements {@link org.apache.hyracks.api.application.ICCApplicationContext}
+ * @return an instance which implements {@link org.apache.hyracks.api.application.ICCServiceContext}
*/
- public ICCApplicationContext getCCApplicationContext();
+ public ICCServiceContext getCCServiceContext();
/**
* @return the global recovery manager which implements
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index ac878ec..2eca55d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -21,13 +21,13 @@
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -94,7 +94,7 @@
frameTuple = new FrameTupleReference();
}
IAppRuntimeContext runtimeCtx =
- (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ (IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
LSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
} catch (Throwable th) {
throw new HyracksDataException(th);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java
index e53ca7f..81cb58a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Resource.java
@@ -21,7 +21,7 @@
import java.io.Serializable;
import java.util.List;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -72,7 +72,7 @@
return datasetId;
}
- public abstract ILSMIndex createIndexInstance(INCApplicationContext appCtx, LocalResource resource)
+ public abstract ILSMIndex createIndexInstance(INCServiceContext ncServiceCtx, LocalResource resource)
throws HyracksDataException;
public static int getIoDeviceNum(IIOManager ioManager, IODeviceHandle deviceHandle) {
diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/builder/AbstractLocalExperimentBuilder.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/builder/AbstractLocalExperimentBuilder.java
index eac4ac2..315ad9c 100644
--- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/builder/AbstractLocalExperimentBuilder.java
+++ b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/builder/AbstractLocalExperimentBuilder.java
@@ -19,19 +19,7 @@
package org.apache.asterix.experiment.builder;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.asterix.experiment.action.base.SequentialActionList;
-import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
-import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
public abstract class AbstractLocalExperimentBuilder extends AbstractExperimentBuilder {
@@ -46,74 +34,4 @@
protected abstract void addPost(SequentialActionList post);
-// @Override
-// protected void prePost(SequentialExecutableSet pre, SequentialExecutableSet post) {
-// int ccClientPort = 1098;
-// int ccClusterPort = 1099;
-// CCConfig ccConfig = new CCConfig();
-// ccConfig.clusterNetIpAddress = "127.0.0.1";
-// ccConfig.clientNetIpAddress = "127.0.0.1";
-// ccConfig.clientNetPort = ccClientPort;
-// ccConfig.clusterNetPort = ccClusterPort;
-// ccConfig.defaultMaxJobAttempts = 0;
-// ccConfig.resultTTL = 30000;
-// ccConfig.resultSweepThreshold = 1000;
-// ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
-// final ClusterControllerService cc;
-// try {
-// cc = new ClusterControllerService(ccConfig);
-// } catch (Exception e) {
-// throw new IllegalArgumentException(e);
-// }
-//
-// final List<NodeControllerService> ncs = new ArrayList<>();
-// for (int i = 0; i < nNodeControllers; ++i) {
-// NCConfig ncConfig = new NCConfig();
-// ncConfig.ccHost = "localhost";
-// ncConfig.ccPort = ccClusterPort;
-// ncConfig.clusterNetIPAddress = "127.0.0.1";
-// ncConfig.dataIPAddress = "127.0.0.1";
-// ncConfig.datasetIPAddress = "127.0.0.1";
-// ncConfig.nodeId = "nc" + String.valueOf((i + 1));
-// ncConfig.resultTTL = 30000;
-// ncConfig.resultSweepThreshold = 1000;
-// Path p0 = Paths.get(System.getProperty("java.io.tmpdir"), ncConfig.nodeId, "iodevice0");
-// Path p1 = Paths.get(System.getProperty("java.io.tmpdir"), ncConfig.nodeId, "iodevice1");
-// ncConfig.ioDevices = p0.toString() + "," + p1.toString();
-// ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
-// NodeControllerService nc;
-// try {
-// nc = new NodeControllerService(ncConfig);
-// } catch (Exception e) {
-// throw new IllegalArgumentException(e);
-// }
-// ncs.add(nc);
-// }
-//
-// pre.add(new AbstractExecutable() {
-//
-// @Override
-// protected void doExecute() throws Exception {
-// cc.start();
-// for (NodeControllerService nc : ncs) {
-// nc.start();
-// }
-// }
-// });
-//
-// post.add(new AbstractExecutable() {
-//
-// @Override
-// protected void doExecute() throws Exception {
-// Collections.reverse(ncs);
-// for (NodeControllerService nc : ncs) {
-// nc.stop();
-// }
-// cc.stop();
-// System.exit(1);
-// }
-// });
-// addPre(pre);
-// addPost(post);
-// }
}
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index d1f67f0..4bb06c2 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -268,7 +268,6 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
- <version>0.13.0</version>
</dependency>
<dependency>
<groupId>com.e-movimento.tinytools</groupId>
@@ -368,10 +367,6 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-control-nc</artifactId>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 37262b7..577da5e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -85,10 +85,10 @@
@Override
public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- IAppRuntimeContext runtimeCtx =
- (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appCtx =
+ (IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
try {
- restoreExternalObjects(runtimeCtx.getLibraryManager());
+ restoreExternalObjects(appCtx.getLibraryManager());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 2b899d9..3d1297d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -41,7 +41,6 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -129,18 +128,22 @@
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition, IExternalIndexer indexer)
throws HyracksDataException {
try {
- if (!configured) {
- conf = confFactory.getConf();
- inputSplits = inputSplitsFactory.getSplits();
- nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
- configured = true;
- }
+ restoreConfig(ctx);
return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer);
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
+ private void restoreConfig(IHyracksTaskContext ctx) throws HyracksDataException {
+ if (!configured) {
+ conf = confFactory.getConf();
+ inputSplits = inputSplitsFactory.getSplits();
+ nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
+ configured = true;
+ }
+ }
+
/**
* Get the cluster locations for this input stream factory. This method specifies on which asterix nodes the
* external
@@ -202,11 +205,8 @@
return streamReader;
}
}
- JobConf conf = confFactory.getConf();
- InputSplit[] inputSplits = inputSplitsFactory.getSplits();
- String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
- return new HDFSRecordReader<Object, Writable>(read, inputSplits, readSchedule, nodeName, conf, files,
- indexer);
+ restoreConfig(ctx);
+ return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, files, indexer);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index f877796..a2e3704 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -60,7 +60,7 @@
public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
if (watcher == null) {
- String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
ArrayList<Path> inputResources = new ArrayList<>();
for (int i = 0; i < inputFileSplits.length; i++) {
if (inputFileSplits[i].getNodeName().equals(nodeName)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index c80aebc..866fd9c 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -76,7 +76,7 @@
} else {
// Gets the library manager for real runtime evaluation.
IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) context.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ .getServiceContext().getApplicationContext();
libraryManager = runtimeCtx.getLibraryManager();
}
ClassLoader libraryClassLoader = libraryManager.getLibraryClassLoader(dataverse, functionLibary);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 384da84..bdc11f5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -51,8 +51,8 @@
this.partition = partition;
this.connectionId = feedConnectionId;
this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
- this.activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject()).getActiveManager();
+ this.activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
+ .getApplicationContext()).getActiveManager();
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index ab8c8f7..b911bf1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -103,7 +103,7 @@
private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
IAdapterFactory adapterFactory;
IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ .getServiceContext().getApplicationContext();
ILibraryManager libraryManager = runtimeCtx.getLibraryManager();
ClassLoader classLoader = libraryManager.getLibraryClassLoader(feedId.getDataverse(), adaptorLibraryName);
if (classLoader != null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index ed017a3..fbdbece 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -104,8 +104,8 @@
this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
this.partition = partition;
this.connectionId = feedConnectionId;
- this.feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject()).getActiveManager();
+ this.feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
+ .getApplicationContext()).getActiveManager();
this.message = new VSizeFrame(ctx);
TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
this.opDesc = feedMetaOperatorDescriptor;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 48276d5..0bb27db 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -97,8 +97,8 @@
this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
this.partition = partition;
this.connectionId = feedConnectionId;
- this.feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject()).getActiveManager();
+ this.feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext()
+ .getApplicationContext()).getActiveManager();
this.targetId = targetId;
this.message = new VSizeFrame(ctx);
TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 53a46ab..def3ee2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -52,7 +52,7 @@
public class HDFSUtils {
public static Scheduler initializeHDFSScheduler() throws HyracksDataException {
- ICCContext ccContext = AppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
+ ICCContext ccContext = AppContextInfo.INSTANCE.getCCServiceContext().getCCContext();
Scheduler scheduler = null;
try {
scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
@@ -64,7 +64,7 @@
}
public static IndexingScheduler initializeIndexingHDFSScheduler() throws HyracksDataException {
- ICCContext ccContext = AppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
+ ICCContext ccContext = AppContextInfo.INSTANCE.getCCServiceContext().getCCContext();
IndexingScheduler scheduler = null;
try {
scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 3863920..a108908 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -39,7 +39,6 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.std.file.ITupleParser;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -69,7 +68,7 @@
@Override
public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+ final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
final ITupleParserFactory tupleParserFactory = new ITupleParserFactory() {
private static final long serialVersionUID = 1L;
@@ -79,8 +78,7 @@
ITupleForwarder forwarder;
ArrayTupleBuilder tb;
IPropertiesProvider propertiesProvider =
- (IPropertiesProvider) ((NodeControllerService) ctx.getJobletContext().getApplicationContext()
- .getControllerService()).getApplicationContext().getApplicationObject();
+ (IPropertiesProvider) ctx.getJobletContext().getServiceContext().getApplicationContext();
ClusterPartition nodePartition =
propertiesProvider.getMetadataProperties().getNodePartitions().get(nodeId)[0];
parser = new ADMDataParser(outputType, true);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataExtension.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataExtension.java
index 5626a83..ef624ed 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataExtension.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataExtension.java
@@ -25,7 +25,7 @@
import org.apache.asterix.common.api.IExtension;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -57,6 +57,6 @@
* @throws RemoteException
* @throws ACIDException
*/
- void initializeMetadata(INCApplicationContext appCtx) throws HyracksDataException, RemoteException, ACIDException;
+ void initializeMetadata(INCServiceContext ncServiceCtx) throws HyracksDataException, RemoteException, ACIDException;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 02a092d..0410911 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -71,7 +71,7 @@
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -106,7 +106,7 @@
public class MetadataBootstrap {
public static final boolean IS_DEBUG_MODE = false;
private static final Logger LOGGER = Logger.getLogger(MetadataBootstrap.class.getName());
- private static IAppRuntimeContext runtimeContext;
+ private static IAppRuntimeContext appContext;
private static IBufferCache bufferCache;
private static IFileMapProvider fileMapProvider;
private static IDatasetLifecycleManager dataLifecycleManager;
@@ -132,26 +132,26 @@
* bootstrap metadata
*
* @param asterixPropertiesProvider
- * @param ncApplicationContext
+ * @param ncServiceContext
* @param isNewUniverse
* @throws ACIDException
* @throws RemoteException
* @throws MetadataException
* @throws Exception
*/
- public static void startUniverse(INCApplicationContext ncApplicationContext, boolean isNewUniverse)
+ public static void startUniverse(INCServiceContext ncServiceContext, boolean isNewUniverse)
throws RemoteException, ACIDException, MetadataException {
MetadataBootstrap.setNewUniverse(isNewUniverse);
- runtimeContext = (IAppRuntimeContext) ncApplicationContext.getApplicationObject();
+ appContext = (IAppRuntimeContext) ncServiceContext.getApplicationContext();
- MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
+ MetadataProperties metadataProperties = appContext.getMetadataProperties();
metadataNodeName = metadataProperties.getMetadataNodeName();
nodeNames = metadataProperties.getNodeNames();
- dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
- localResourceRepository = runtimeContext.getLocalResourceRepository();
- bufferCache = runtimeContext.getBufferCache();
- fileMapProvider = runtimeContext.getFileMapManager();
- ioManager = ncApplicationContext.getIoManager();
+ dataLifecycleManager = appContext.getDatasetLifecycleManager();
+ localResourceRepository = appContext.getLocalResourceRepository();
+ bufferCache = appContext.getBufferCache();
+ fileMapProvider = appContext.getFileMapManager();
+ ioManager = ncServiceContext.getIoManager();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
try {
@@ -160,7 +160,7 @@
MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.X);
for (int i = 0; i < PRIMARY_INDEXES.length; i++) {
- enlistMetadataDataset(ncApplicationContext, PRIMARY_INDEXES[i]);
+ enlistMetadataDataset(ncServiceContext, PRIMARY_INDEXES[i]);
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(
@@ -327,9 +327,9 @@
* @param index
* @throws HyracksDataException
*/
- public static void enlistMetadataDataset(INCApplicationContext appCtx, IMetadataIndex index)
+ public static void enlistMetadataDataset(INCServiceContext ncServiceCtx, IMetadataIndex index)
throws HyracksDataException {
- ClusterPartition metadataPartition = runtimeContext.getMetadataProperties().getMetadataPartition();
+ ClusterPartition metadataPartition = appContext.getMetadataProperties().getMetadataPartition();
int metadataDeviceId = metadataPartition.getIODeviceNum();
String metadataPartitionPath = StoragePathUtil.prepareStoragePartitionPath(
ClusterProperties.INSTANCE.getStorageDirectoryName(), metadataPartition.getPartitionId());
@@ -338,7 +338,7 @@
// this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for
// a dataset that was not yet created
- List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getDatasetLifecycleManager()
+ List<IVirtualBufferCache> virtualBufferCaches = appContext.getDatasetLifecycleManager()
.getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
@@ -357,19 +357,19 @@
// This is to be done by having a metadata dataset associated with each index
lsmBtree = LSMBTreeUtil.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache, fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
- runtimeContext.getBloomFilterFalsePositiveRate(),
- runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
+ appContext.getBloomFilterFalsePositiveRate(),
+ appContext.getMetadataMergePolicyFactory().createMergePolicy(
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager),
- opTrackerProvider.getOperationTracker(appCtx), runtimeContext.getLSMIOScheduler(),
+ opTrackerProvider.getOperationTracker(ncServiceCtx), appContext.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), index.isPrimaryIndex(), null, null, null, null, true,
- runtimeContext.getStorageComponentProvider().getMetadataPageManagerFactory());
+ appContext.getStorageComponentProvider().getMetadataPageManagerFactory());
lsmBtree.create();
resourceID = index.getResourceID();
Resource localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits, comparatorFactories,
bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(),
- metadataPartition.getPartitionId(), runtimeContext.getMetadataMergePolicyFactory(),
+ metadataPartition.getPartitionId(), appContext.getMetadataMergePolicyFactory(),
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, null, null, null, null, opTrackerProvider,
- ioOpCallbackFactory, runtimeContext.getStorageComponentProvider().getMetadataPageManagerFactory());
+ ioOpCallbackFactory, appContext.getStorageComponentProvider().getMetadataPageManagerFactory());
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
partition -> localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
@@ -380,8 +380,8 @@
final LocalResource resource = localResourceRepository.get(file.getRelativePath());
if (resource == null) {
throw new HyracksDataException("Could not find required metadata indexes. Please delete "
- + runtimeContext.getMetadataProperties().getTransactionLogDirs()
- .get(runtimeContext.getTransactionSubsystem().getId())
+ + appContext.getMetadataProperties().getTransactionLogDirs()
+ .get(appContext.getTransactionSubsystem().getId())
+ " to intialize as a new instance. (WARNING: all data will be lost.)");
}
resourceID = resource.getId();
@@ -392,13 +392,13 @@
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtil.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache,
fileMapProvider, typeTraits, comparatorFactories, bloomFilterKeyFields,
- runtimeContext.getBloomFilterFalsePositiveRate(),
- runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
+ appContext.getBloomFilterFalsePositiveRate(),
+ appContext.getMetadataMergePolicyFactory().createMergePolicy(
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager),
- opTrackerProvider.getOperationTracker(appCtx), runtimeContext.getLSMIOScheduler(),
+ opTrackerProvider.getOperationTracker(ncServiceCtx), appContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), index.isPrimaryIndex(), null,
null, null, null, true,
- runtimeContext.getStorageComponentProvider().getMetadataPageManagerFactory());
+ appContext.getStorageComponentProvider().getMetadataPageManagerFactory());
dataLifecycleManager.register(file.getRelativePath(), lsmBtree);
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index afc1dfd..f02496a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -73,7 +73,7 @@
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.util.StorageUtil;
@@ -109,7 +109,7 @@
public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager,
IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
- INCApplicationContext appContext, IAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
+ INCServiceContext ncServiceContext, IAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
this.logManager = logManager;
this.localNodeID = nodeId;
this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager;
@@ -125,9 +125,9 @@
replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
lsmComponentLSNMappingService = new LSMComponentsSyncService();
replicationNotifier = new ReplicationNotifier();
- replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
- Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) asterixAppRuntimeContextProvider
- .getAppContext()).getMetadataProperties().getNodePartitions();
+ replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
+ Map<String, ClusterPartition[]> nodePartitions =
+ asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
List<Integer> clientsPartitions = new ArrayList<>();
for (String clientId : nodeReplicationClients) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 899b0b9..7ec5691 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -51,8 +51,8 @@
@Override
public void jobletFinish(JobStatus jobStatus) {
try {
- ITransactionManager txnManager = ((IAppRuntimeContext) jobletContext.getApplicationContext()
- .getApplicationObject()).getTransactionSubsystem().getTransactionManager();
+ ITransactionManager txnManager = ((IAppRuntimeContext) jobletContext.getServiceContext()
+ .getApplicationContext()).getTransactionSubsystem().getTransactionManager();
ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
txnContext.setWriteTxn(transactionalWrite);
txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
@@ -65,7 +65,7 @@
@Override
public void jobletStart() {
try {
- ((IAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
+ ((IAppRuntimeContext) jobletContext.getServiceContext().getApplicationContext())
.getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true);
} catch (ACIDException e) {
throw new Error(e);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index 749de60..ac1895b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -54,7 +54,7 @@
public void jobletFinish(JobStatus jobStatus) {
try {
ITransactionManager txnManager =
- ((IAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
+ ((IAppRuntimeContext) jobletContext.getServiceContext().getApplicationContext())
.getTransactionSubsystem().getTransactionManager();
for (JobId jobId : jobIds) {
ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
@@ -71,7 +71,7 @@
public void jobletStart() {
try {
for (JobId jobId : jobIds) {
- ((IAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
+ ((IAppRuntimeContext) jobletContext.getServiceContext().getApplicationContext())
.getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true);
}
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
index 1ee0399..8eb3663 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -26,7 +26,6 @@
import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class ReplicaEventMessage implements IApplicationMessage {
@@ -55,9 +54,7 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) cs.getApplicationContext();
Node node = new Node();
node.setId(nodeId);
node.setClusterIp(nodeIPAddress);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index 042837b..fc2650e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -55,13 +55,12 @@
public static void send(NodeControllerService cs) throws HyracksDataException {
NodeControllerService ncs = cs;
- IAppRuntimeContext appContext =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext();
long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
try {
- ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
+ ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 8739948..c8aef37 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -40,7 +40,7 @@
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
try {
ICCMessageBroker broker =
- (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+ (ICCMessageBroker) AppContextInfo.INSTANCE.getCCServiceContext().getMessageBroker();
ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
if (!ClusterStateManager.INSTANCE.isClusterActive()) {
reponse.setResourceId(-1);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index db6afb7..1106da9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -23,7 +23,6 @@
import org.apache.asterix.runtime.transaction.GlobalResourceIdFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
public class ResourceIdRequestResponseMessage implements IApplicationMessage {
private static final long serialVersionUID = 1L;
@@ -49,10 +48,8 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext asterixNcAppRuntimeCtx =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- ((GlobalResourceIdFactory) asterixNcAppRuntimeCtx.getResourceIdFactory()).addNewIds(this);
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) cs.getApplicationContext();
+ ((GlobalResourceIdFactory) appCtx.getResourceIdFactory()).addNewIds(this);
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 037945a..704ee52 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -154,10 +154,10 @@
indexAccessor = index.createAccessor(modCallback, searchCallback);
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
- IAppRuntimeContext runtimeCtx =
- (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appCtx =
+ (IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
- runtimeCtx.getTransactionSubsystem().getLogManager());
+ appCtx.getTransactionSubsystem().getLogManager());
frameOpCallback =
frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
} catch (Exception e) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 69a2faf..ccb63ac 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -71,11 +71,11 @@
@Override
public void close() throws HyracksDataException {
try {
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- IDatasetLifecycleManager datasetLifeCycleManager = runtimeCtx.getDatasetLifecycleManager();
- ILockManager lockManager = runtimeCtx.getTransactionSubsystem().getLockManager();
- ITransactionManager txnManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) ctx.getJobletContext()
+ .getServiceContext().getApplicationContext();
+ IDatasetLifecycleManager datasetLifeCycleManager = appCtx.getDatasetLifecycleManager();
+ ILockManager lockManager = appCtx.getTransactionSubsystem().getLockManager();
+ ITransactionManager txnManager = appCtx.getTransactionSubsystem().getTransactionManager();
// get the local transaction
ITransactionContext txnCtx = txnManager.getTransactionContext(jobId, false);
// lock the dataset granule
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index a8b61f7..b835b3a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -23,9 +23,8 @@
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
-import org.apache.hyracks.api.application.IApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
/**
@@ -34,14 +33,14 @@
*/
public class GlobalResourceIdFactory implements IResourceIdFactory {
- private final IApplicationContext appCtx;
+ private final INCServiceContext serviceCtx;
private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
private final String nodeId;
- public GlobalResourceIdFactory(IApplicationContext appCtx) {
- this.appCtx = appCtx;
+ public GlobalResourceIdFactory(INCServiceContext serviceCtx) {
+ this.serviceCtx = serviceCtx;
this.resourceIdResponseQ = new LinkedBlockingQueue<>();
- this.nodeId = ((NodeControllerService) appCtx.getControllerService()).getApplicationContext().getNodeId();
+ this.nodeId = serviceCtx.getNodeId();
}
public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
@@ -63,7 +62,7 @@
//if no response available or it has an exception, request a new one
if (reponse == null || reponse.getException() != null) {
ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
- ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg);
+ ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToCC(msg);
reponse = resourceIdResponseQ.take();
if (reponse.getException() != null) {
throw new HyracksDataException(reponse.getException().getMessage());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
index 9027f75..d99ea90 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
@@ -18,17 +18,17 @@
*/
package org.apache.asterix.runtime.transaction;
-import org.apache.hyracks.api.application.IApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
public class GlobalResourceIdFactoryProvider {
- private final IApplicationContext appCtx;
+ private final INCServiceContext ncServiceCtx;
- public GlobalResourceIdFactoryProvider(IApplicationContext appCtx) {
- this.appCtx = appCtx;
+ public GlobalResourceIdFactoryProvider(INCServiceContext ncServiceCtx) {
+ this.ncServiceCtx = ncServiceCtx;
}
public GlobalResourceIdFactory createResourceIdFactory() {
- return new GlobalResourceIdFactory(appCtx);
+ return new GlobalResourceIdFactory(ncServiceCtx);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
index 400bb5a..4185b26 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
@@ -42,7 +42,7 @@
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.common.IStorageManager;
@@ -55,7 +55,7 @@
public class AppContextInfo implements IApplicationContextInfo, IPropertiesProvider {
public static final AppContextInfo INSTANCE = new AppContextInfo();
- private ICCApplicationContext appCtx;
+ private ICCServiceContext ccServiceCtx;
private IGlobalRecoveryManager globalRecoveryManager;
private ILibraryManager libraryManager;
private IResourceIdManager resourceIdManager;
@@ -79,7 +79,7 @@
private AppContextInfo() {
}
- public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+ public static synchronized void initialize(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, IResourceIdManager resourceIdManager,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
IFaultToleranceStrategy ftStrategy)
@@ -88,13 +88,13 @@
throw new AsterixException(AppContextInfo.class.getSimpleName() + " has been initialized already");
}
INSTANCE.initialized = true;
- INSTANCE.appCtx = ccAppCtx;
+ INSTANCE.ccServiceCtx = ccServiceCtx;
INSTANCE.hcc = hcc;
INSTANCE.libraryManager = libraryManager;
INSTANCE.resourceIdManager = resourceIdManager;
// Determine whether to use old-style asterix-configuration.xml or new-style configuration.
// QQQ strip this out eventually
- PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ccAppCtx.getAppConfig());
+ PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig());
INSTANCE.compilerProperties = new CompilerProperties(propertiesAccessor);
INSTANCE.externalProperties = new ExternalProperties(propertiesAccessor);
INSTANCE.metadataProperties = new MetadataProperties(propertiesAccessor);
@@ -120,8 +120,8 @@
}
@Override
- public ICCApplicationContext getCCApplicationContext() {
- return appCtx;
+ public ICCServiceContext getCCServiceContext() {
+ return ccServiceCtx;
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 49cbc54..f65979f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -79,7 +79,7 @@
private ClusterStateManager() {
cluster = ClusterProperties.INSTANCE.getCluster();
// if this is the CC process
- if (AppContextInfo.INSTANCE.initialized() && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+ if (AppContextInfo.INSTANCE.initialized() && AppContextInfo.INSTANCE.getCCServiceContext() != null) {
node2PartitionsMap = AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
clusterPartitions = AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
currentMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
index 117fa9e..387e949 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
@@ -42,37 +42,37 @@
@Override
public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
- return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ return ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getLSMIOScheduler();
}
@Override
public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
- return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ return ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getBufferCache();
}
@Override
public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
- return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ return ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getFileMapManager();
}
@Override
public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
- return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ return ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getLocalResourceRepository();
}
@Override
public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
- return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ return ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getDatasetLifecycleManager();
}
@Override
public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
- return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ return ((IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getResourceIdFactory();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
index 8e3f989..a2a191d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
@@ -53,13 +53,13 @@
public static Map<InetAddress, Set<String>> getNodeControllerMap() throws HyracksDataException {
Map<InetAddress, Set<String>> map = new HashMap<>();
- AppContextInfo.INSTANCE.getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+ AppContextInfo.INSTANCE.getCCServiceContext().getCCContext().getIPAddressNodeMap(map);
return map;
}
public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) {
ClusterControllerService ccs =
- (ClusterControllerService) AppContextInfo.INSTANCE.getCCApplicationContext().getControllerService();
+ (ClusterControllerService) AppContextInfo.INSTANCE.getCCServiceContext().getControllerService();
INodeManager nodeManager = ccs.getNodeManager();
map.putAll(nodeManager.getIpAddressNodeNameMap());
}
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index eef2f35..3a2fbe1 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -240,7 +240,7 @@
<mainClass>org.apache.hyracks.control.cc.CCDriver</mainClass>
<commandLineArguments>
<commandLineArgument>-app-class</commandLineArgument>
- <commandLineArgument>org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint</commandLineArgument>
+ <commandLineArgument>org.apache.asterix.hyracks.bootstrap.CCApplication</commandLineArgument>
</commandLineArguments>
</program>
<program>
@@ -251,7 +251,7 @@
<mainClass>org.apache.hyracks.control.nc.NCDriver</mainClass>
<commandLineArguments>
<commandLineArgument>-app-class</commandLineArgument>
- <commandLineArgument>org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint</commandLineArgument>
+ <commandLineArgument>org.apache.asterix.hyracks.bootstrap.NCApplication</commandLineArgument>
</commandLineArguments>
</program>
<program>
@@ -271,7 +271,7 @@
</platforms>
<commandLineArguments>
<commandLineArgument>-app-class</commandLineArgument>
- <commandLineArgument>org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint</commandLineArgument>
+ <commandLineArgument>org.apache.asterix.hyracks.bootstrap.CCApplication</commandLineArgument>
</commandLineArguments>
</daemon>
<daemon>
@@ -282,7 +282,7 @@
</platforms>
<commandLineArguments>
<commandLineArgument>-app-class</commandLineArgument>
- <commandLineArgument>org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint</commandLineArgument>
+ <commandLineArgument>org.apache.asterix.hyracks.bootstrap.NCApplication</commandLineArgument>
</commandLineArguments>
</daemon>
<daemon>
diff --git a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
index cc87be4..72d28b9 100644
--- a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
+++ b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
@@ -31,11 +31,11 @@
[nc]
address=127.0.0.1
command=asterixnc
-app.class=org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
storage.subdir=test_storage
storage.memorycomponent.globalbudget = 1073741824
[cc]
address = 127.0.0.1
-app.class=org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index 4e12f96..c553bc0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -21,7 +21,7 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -36,9 +36,9 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCApplicationContext ctx) {
- IDatasetLifecycleManager dslcManager = ((IAppRuntimeContext) ctx.getApplicationObject())
- .getDatasetLifecycleManager();
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ IDatasetLifecycleManager dslcManager =
+ ((IAppRuntimeContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
return dslcManager.getOperationTracker(datasetID);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index be98704..4832acd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,7 +21,7 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -36,9 +36,9 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCApplicationContext ctx) {
- IDatasetLifecycleManager dslcManager = ((IAppRuntimeContext) ctx.getApplicationObject())
- .getDatasetLifecycleManager();
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ IDatasetLifecycleManager dslcManager =
+ ((IAppRuntimeContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
return new BaseOperationTracker(datasetID, dslcManager.getDatasetInfo(datasetID));
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index 06ca8e6..d10a9a9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -21,7 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -50,17 +50,17 @@
}
@Override
- public ILSMIndex createIndexInstance(INCApplicationContext appCtx, LocalResource resource)
+ public ILSMIndex createIndexInstance(INCServiceContext serviceCtx, LocalResource resource)
throws HyracksDataException {
- IAppRuntimeContext runtimeContextProvider = (IAppRuntimeContext) appCtx.getApplicationObject();
- IIOManager ioManager = runtimeContextProvider.getIOManager();
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) serviceCtx.getApplicationContext();
+ IIOManager ioManager = appCtx.getIOManager();
FileReference file = ioManager.resolve(resource.getPath());
- return LSMBTreeUtil.createExternalBTree(ioManager, file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return LSMBTreeUtil.createExternalBTree(ioManager, file, appCtx.getBufferCache(),
+ appCtx.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
+ appCtx.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getDatasetLifecycleManager()),
- opTrackerProvider.getOperationTracker(appCtx), runtimeContextProvider.getLSMIOScheduler(),
+ appCtx.getDatasetLifecycleManager()),
+ opTrackerProvider.getOperationTracker(serviceCtx), appCtx.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), -1, true, metadataPageManagerFactory);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
index 1a080b2..fd7ff0f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
@@ -22,7 +22,7 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.transactions.Resource;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -65,17 +65,17 @@
}
@Override
- public ILSMIndex createIndexInstance(INCApplicationContext appCtx, LocalResource resource)
+ public ILSMIndex createIndexInstance(INCServiceContext serviceCtx, LocalResource resource)
throws HyracksDataException {
- IAppRuntimeContext appRuntimeCtx = (IAppRuntimeContext) appCtx.getApplicationObject();
- IIOManager ioManager = appCtx.getIoManager();
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) serviceCtx.getApplicationContext();
+ IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(resource.getPath());
- return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, appRuntimeCtx.getBufferCache(),
- appRuntimeCtx.getFileMapManager(), typeTraits, btreeCmpFactories,
- appRuntimeCtx.getBloomFilterFalsePositiveRate(),
+ return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, appCtx.getBufferCache(),
+ appCtx.getFileMapManager(), typeTraits, btreeCmpFactories,
+ appCtx.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- appRuntimeCtx.getDatasetLifecycleManager()),
- opTrackerProvider.getOperationTracker(appCtx), appRuntimeCtx.getLSMIOScheduler(),
+ appCtx.getDatasetLifecycleManager()),
+ opTrackerProvider.getOperationTracker(serviceCtx), appCtx.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), buddyBtreeFields, -1, true, metadataPageManagerFactory);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
index c923046..54cd97d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
@@ -21,7 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -59,18 +59,18 @@
}
@Override
- public ILSMIndex createIndexInstance(INCApplicationContext appCtx, LocalResource resource)
+ public ILSMIndex createIndexInstance(INCServiceContext serviceCtx, LocalResource resource)
throws HyracksDataException {
- IAppRuntimeContext runtimeContextProvider = (IAppRuntimeContext) appCtx.getApplicationObject();
- IIOManager ioManager = runtimeContextProvider.getIOManager();
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) serviceCtx.getApplicationContext();
+ IIOManager ioManager = appCtx.getIOManager();
FileReference file = ioManager.resolve(resource.getPath());
try {
- return LSMRTreeUtils.createExternalRTree(ioManager, file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return LSMRTreeUtils.createExternalRTree(ioManager, file, appCtx.getBufferCache(),
+ appCtx.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
+ valueProviderFactories, rtreePolicyType, appCtx.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getDatasetLifecycleManager()),
- opTrackerProvider.getOperationTracker(appCtx), runtimeContextProvider.getLSMIOScheduler(),
+ appCtx.getDatasetLifecycleManager()),
+ opTrackerProvider.getOperationTracker(serviceCtx), appCtx.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), linearizeCmpFactory, btreeFields, -1, true, isPointMBR,
metadataPageManagerFactory);
} catch (TreeIndexException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index f38711c..0776567 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -23,7 +23,7 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.transactions.Resource;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -75,19 +75,19 @@
}
@Override
- public ILSMIndex createIndexInstance(INCApplicationContext appCtx, LocalResource resource)
+ public ILSMIndex createIndexInstance(INCServiceContext serviceCtx, LocalResource resource)
throws HyracksDataException {
- IAppRuntimeContext appRuntimeCtx = (IAppRuntimeContext) appCtx.getApplicationObject();
- IIOManager ioManager = appRuntimeCtx.getIOManager();
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) serviceCtx.getApplicationContext();
+ IIOManager ioManager = appCtx.getIOManager();
FileReference file = ioManager.resolve(resource.getPath());
int ioDeviceNum = Resource.getIoDeviceNum(ioManager, file.getDeviceHandle());
- final IDatasetLifecycleManager datasetLifecycleManager = appRuntimeCtx.getDatasetLifecycleManager();
+ final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
return LSMBTreeUtil.createLSMTree(ioManager,
datasetLifecycleManager.getVirtualBufferCaches(datasetId(), ioDeviceNum), file,
- appRuntimeCtx.getBufferCache(), appRuntimeCtx.getFileMapManager(), typeTraits, cmpFactories,
- bloomFilterKeyFields, appRuntimeCtx.getBloomFilterFalsePositiveRate(),
+ appCtx.getBufferCache(), appCtx.getFileMapManager(), typeTraits, cmpFactories,
+ bloomFilterKeyFields, appCtx.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties, datasetLifecycleManager),
- opTrackerProvider.getOperationTracker(appCtx), appRuntimeCtx.getLSMIOScheduler(),
+ opTrackerProvider.getOperationTracker(serviceCtx), appCtx.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), isPrimary, filterTypeTraits, filterCmpFactories, btreeFields,
filterFields, true, metadataPageManagerFactory);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index acb087a..a682f5a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -23,7 +23,7 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.transactions.Resource;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -81,34 +81,34 @@
}
@Override
- public ILSMIndex createIndexInstance(INCApplicationContext appCtx, LocalResource resource)
+ public ILSMIndex createIndexInstance(INCServiceContext serviceCtx, LocalResource resource)
throws HyracksDataException {
- IAppRuntimeContext runtimeContextProvider = (IAppRuntimeContext) appCtx.getApplicationObject();
- IIOManager ioManager = runtimeContextProvider.getIOManager();
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) serviceCtx.getApplicationContext();
+ IIOManager ioManager = appCtx.getIOManager();
FileReference file = ioManager.resolve(resource.getPath());
int ioDeviceNum = Resource.getIoDeviceNum(ioManager, file.getDeviceHandle());
List<IVirtualBufferCache> virtualBufferCaches =
- runtimeContextProvider.getDatasetLifecycleManager().getVirtualBufferCaches(datasetId(), ioDeviceNum);
+ appCtx.getDatasetLifecycleManager().getVirtualBufferCaches(datasetId(), ioDeviceNum);
try {
if (isPartitioned) {
return InvertedIndexUtils.createPartitionedLSMInvertedIndex(ioManager, virtualBufferCaches,
- runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
- tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
- file.getAbsolutePath(), runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ appCtx.getFileMapManager(), invListTypeTraits, invListCmpFactories,
+ tokenTypeTraits, tokenCmpFactories, tokenizerFactory, appCtx.getBufferCache(),
+ file.getAbsolutePath(), appCtx.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getDatasetLifecycleManager()),
- opTrackerProvider.getOperationTracker(appCtx), runtimeContextProvider.getLSMIOScheduler(),
+ appCtx.getDatasetLifecycleManager()),
+ opTrackerProvider.getOperationTracker(serviceCtx), appCtx.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), invertedIndexFields, filterTypeTraits,
filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
invertedIndexFieldsForNonBulkLoadOps, true, metadataPageManagerFactory);
} else {
return InvertedIndexUtils.createLSMInvertedIndex(ioManager, virtualBufferCaches,
- runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
- tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
- file.getAbsolutePath(), runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ appCtx.getFileMapManager(), invListTypeTraits, invListCmpFactories,
+ tokenTypeTraits, tokenCmpFactories, tokenizerFactory, appCtx.getBufferCache(),
+ file.getAbsolutePath(), appCtx.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getDatasetLifecycleManager()),
- opTrackerProvider.getOperationTracker(appCtx), runtimeContextProvider.getLSMIOScheduler(),
+ appCtx.getDatasetLifecycleManager()),
+ opTrackerProvider.getOperationTracker(serviceCtx), appCtx.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), invertedIndexFields, filterTypeTraits,
filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
invertedIndexFieldsForNonBulkLoadOps, true, metadataPageManagerFactory);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 500ca30..c3b7348 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -23,7 +23,7 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.transactions.Resource;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -82,21 +82,21 @@
}
@Override
- public ILSMIndex createIndexInstance(INCApplicationContext appCtx, LocalResource resource)
+ public ILSMIndex createIndexInstance(INCServiceContext serviceCtx, LocalResource resource)
throws HyracksDataException {
- IAppRuntimeContext runtimeContextProvider = (IAppRuntimeContext) appCtx.getApplicationObject();
- IIOManager ioManager = runtimeContextProvider.getIOManager();
+ IAppRuntimeContext appCtx = (IAppRuntimeContext) serviceCtx.getApplicationContext();
+ IIOManager ioManager = appCtx.getIOManager();
FileReference file = ioManager.resolve(resource.getPath());
int ioDeviceNum = Resource.getIoDeviceNum(ioManager, file.getDeviceHandle());
List<IVirtualBufferCache> virtualBufferCaches =
- runtimeContextProvider.getDatasetLifecycleManager().getVirtualBufferCaches(datasetId(), ioDeviceNum);
+ appCtx.getDatasetLifecycleManager().getVirtualBufferCaches(datasetId(), ioDeviceNum);
try {
return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(ioManager, virtualBufferCaches, file,
- runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits,
+ appCtx.getBufferCache(), appCtx.getFileMapManager(), typeTraits,
rtreeCmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getDatasetLifecycleManager()),
- opTrackerProvider.getOperationTracker(appCtx), runtimeContextProvider.getLSMIOScheduler(),
+ appCtx.getDatasetLifecycleManager()),
+ opTrackerProvider.getOperationTracker(serviceCtx), appCtx.getLSMIOScheduler(),
ioOpCallbackFactory.createIoOpCallback(), linearizeCmpFactory, rtreeFields, filterTypeTraits,
filterCmpFactories, filterFields, true, isPointMBR, metadataPageManagerFactory);
} catch (TreeIndexException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 90f6bbf..b114527 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -65,10 +65,10 @@
public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) {
this.ctx = ctx;
- IAppRuntimeContext runtimeCtx =
- (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
- this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
- this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
+ IAppRuntimeContext appCtx =
+ (IAppRuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ this.transactionManager = appCtx.getTransactionSubsystem().getTransactionManager();
+ this.logMgr = appCtx.getTransactionSubsystem().getLogManager();
this.jobId = jobId;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml
index e8c1895..66e3a96 100644
--- a/asterixdb/asterix-yarn/pom.xml
+++ b/asterixdb/asterix-yarn/pom.xml
@@ -191,7 +191,6 @@
<usedDependency>commons-lang:commons-lang</usedDependency>
<usedDependency>commons-logging:commons-logging-api</usedDependency>
<usedDependency>commons-net:commons-net</usedDependency>
- <usedDependency>org.apache.asterix:asterix-app</usedDependency>
<usedDependency>org.apache.asterix:asterix-events</usedDependency>
<usedDependency>org.apache.asterix:asterix-runtime</usedDependency>
<usedDependency>org.apache.asterix:asterix-server</usedDependency>
diff --git a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java b/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
index b4aeeb1..47cf3b3 100644
--- a/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
+++ b/asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
@@ -51,6 +51,8 @@
import org.apache.asterix.event.schema.yarnCluster.Cluster;
import org.apache.asterix.event.schema.yarnCluster.MasterNode;
import org.apache.asterix.event.schema.yarnCluster.Node;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.hyracks.bootstrap.NCApplication;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -152,7 +154,7 @@
//Tells us whether the Cluster Controller is up so we can safely start some Node Controllers
private AtomicBoolean ccUp = new AtomicBoolean();
private AtomicBoolean ccStarted = new AtomicBoolean();
- private Queue<Node> pendingNCs = new ArrayDeque<Node>();
+ private Queue<Node> pendingNCs = new ArrayDeque<>();
//HDFS path to AsterixDB distributable zip
private String asterixZipPath = "";
@@ -176,7 +178,7 @@
private int numTotalContainers = 0;
// Set the local resources
- private Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ private Map<String, LocalResource> localResources = new HashMap<>();
private Cluster clusterDesc = null;
private MasterNode cC = null;
@@ -196,7 +198,7 @@
private boolean initial = false;
// Launch threads
- private List<Thread> launchThreads = new CopyOnWriteArrayList<Thread>();
+ private List<Thread> launchThreads = new CopyOnWriteArrayList<>();
public static void main(String[] args) {
@@ -548,8 +550,9 @@
for (Node node : cl.getNode()) {
InetAddress nodeIp = InetAddress.getByName(node.getClusterIp());
LOG.info(nodeIp + "?=" + containerIp);
- if (nodeIp.equals(containerIp))
+ if (nodeIp.equals(containerIp)) {
return node;
+ }
}
//if we find nothing, this is bad...
throw new java.net.UnknownHostException("Could not resolve container" + containerHost + " to node");
@@ -626,12 +629,14 @@
if (fs.exists(p)) {
FileStatus[] dataverses = fs.listStatus(p);
for (FileStatus d : dataverses) {
- if (!d.isDirectory())
+ if (!d.isDirectory()) {
throw new IOException("Library configuration directory structure is incorrect");
+ }
FileStatus[] libraries = fs.listStatus(d.getPath());
for (FileStatus l : libraries) {
- if (l.isDirectory())
+ if (l.isDirectory()) {
throw new IOException("Library configuration directory structure is incorrect");
+ }
LocalResource lr = Records.newRecord(LocalResource.class);
lr.setResource(ConverterUtils.getYarnUrlFromURI(l.getPath().toUri()));
lr.setSize(l.getLen());
@@ -767,6 +772,7 @@
* is running.
*/
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ @Override
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
@@ -794,10 +800,12 @@
}
//stop infinite looping of run()
if (numCompletedContainers.get() + numFailedContainers.get() == numAllocatedContainers.get()
- && doneAllocating)
+ && doneAllocating) {
done = true;
+ }
}
+ @Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
@@ -850,15 +858,18 @@
/**
* Ask the processes on the container to gracefully exit.
*/
+ @Override
public void onShutdownRequest() {
LOG.info("AM shutting down per request");
done = true;
}
+ @Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
//TODO: This will become important when we deal with what happens if an NC dies
}
+ @Override
public float getProgress() {
//return half way because progress is basically meaningless for us
if (!doneAllocating) {
@@ -867,6 +878,7 @@
return (float) 0.5;
}
+ @Override
public void onError(Throwable arg0) {
LOG.error("Fatal Error recieved by AM: " + arg0);
done = true;
@@ -875,12 +887,13 @@
private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
- private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+ private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<>();
public void addContainer(ContainerId containerId, Container container) {
containers.putIfAbsent(containerId, container);
}
+ @Override
public void onContainerStopped(ContainerId containerId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to stop Container " + containerId);
@@ -888,12 +901,14 @@
containers.remove(containerId);
}
+ @Override
public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
if (LOG.isDebugEnabled()) {
LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus);
}
}
+ @Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
if (LOG.isDebugEnabled()) {
LOG.debug("Succeeded to start Container " + containerId);
@@ -904,15 +919,18 @@
}
}
+ @Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId);
containers.remove(containerId);
}
+ @Override
public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
LOG.error("Failed to query the status of Container " + containerId);
}
+ @Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to stop Container " + containerId);
containers.remove(containerId);
@@ -946,6 +964,7 @@
* for shell command and eventually dispatches the container
* start request to the CM.
*/
+ @Override
public void run() {
LOG.info("Setting up container launch container for containerid=" + container.getId());
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
@@ -954,7 +973,7 @@
//Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the node");
- Map<String, String> env = new HashMap<String, String>();
+ Map<String, String> env = new HashMap<>();
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
@@ -1019,9 +1038,9 @@
* @return A list of the commands that should be executed
*/
private List<String> produceStartCmd(Container container) {
- List<String> commands = new ArrayList<String>();
+ List<String> commands = new ArrayList<>();
// Set the necessary command to execute on the allocated container
- List<CharSequence> vargs = new ArrayList<CharSequence>(5);
+ List<CharSequence> vargs = new ArrayList<>(5);
vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
vargs.add("-classpath " + '\'' + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator + "*\'");
@@ -1032,7 +1051,7 @@
//get our java opts
vargs.add(ccJavaOpts);
vargs.add(CC_CLASSNAME);
- vargs.add("-app-class org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint");
+ vargs.add("-app-class " + CCApplication.class.getName());
vargs.add("-address " + cC.getClusterIp());
vargs.add("-client-listen-address " + cC.getClientIp());
//pass CC optional parameters
@@ -1074,7 +1093,7 @@
}
vargs.add(ncJavaOpts);
vargs.add(NC_CLASSNAME);
- vargs.add("-app-class org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint");
+ vargs.add("-app-class " + NCApplication.class.getName());
vargs.add("-node-id " + local.getId());
vargs.add("-cluster-address " + cC.getClusterIp());
vargs.add("-iodevices " + iodevice);
@@ -1126,8 +1145,8 @@
StringBuilder classPathEnv = new StringBuilder("").append("*");
classPathEnv.append(File.pathSeparatorChar).append("log4j.properties");
- List<String> commands = new ArrayList<String>();
- Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ List<String> commands = new ArrayList<>();
+ Vector<CharSequence> vargs = new Vector<>(5);
vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
vargs.add("-cp " + classPathEnv.toString());
vargs.add(OBLITERATOR_CLASSNAME);
@@ -1177,8 +1196,8 @@
}
classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
- List<String> commands = new ArrayList<String>();
- Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ List<String> commands = new ArrayList<>();
+ Vector<CharSequence> vargs = new Vector<>(5);
vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
vargs.add("-cp " + classPathEnv.toString());
vargs.add(HDFS_BACKUP_CLASSNAME);
@@ -1230,7 +1249,7 @@
private List<String> produceRestoreCommand(Container container) {
if (containerIsCC(container)) {
- List<String> blank = new ArrayList<String>();
+ List<String> blank = new ArrayList<>();
blank.add("");
return blank;
}
@@ -1260,8 +1279,8 @@
}
classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
- List<String> commands = new ArrayList<String>();
- Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ List<String> commands = new ArrayList<>();
+ Vector<CharSequence> vargs = new Vector<>(5);
vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
vargs.add("-cp " + classPathEnv.toString());
vargs.add(HDFS_BACKUP_CLASSNAME);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
similarity index 77%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
index 4f6f450..3ce314f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
@@ -19,20 +19,19 @@
package org.apache.hyracks.api.application;
import org.apache.hyracks.api.config.IConfigManager;
-import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.kohsuke.args4j.OptionHandlerFilter;
-public interface ICCApplicationEntryPoint {
- void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
+public interface IApplication {
+ void start(IServiceContext ctx, String[] args) throws Exception; //NOSONAR
- void stop() throws Exception;
+ void startupCompleted() throws Exception; //NOSONAR
- void startupCompleted() throws Exception;
-
- IJobCapacityController getJobCapacityController();
+ void stop() throws Exception; //NOSONAR
void registerConfig(IConfigManager configManager);
+ Object getApplicationContext();
+
default OptionHandlerFilter getUsageFilter() {
return OptionHandlerFilter.PUBLIC;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
similarity index 67%
copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index 4f6f450..b105d24 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -18,22 +18,13 @@
*/
package org.apache.hyracks.api.application;
-import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.kohsuke.args4j.OptionHandlerFilter;
-public interface ICCApplicationEntryPoint {
- void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
-
- void stop() throws Exception;
-
- void startupCompleted() throws Exception;
+public interface ICCApplication extends IApplication {
IJobCapacityController getJobCapacityController();
- void registerConfig(IConfigManager configManager);
+ IHyracksClientConnection getHcc() throws Exception; //NOSONAR
- default OptionHandlerFilter getUsageFilter() {
- return OptionHandlerFilter.PUBLIC;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCServiceContext.java
similarity index 76%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCServiceContext.java
index 55cc5fa..c0e9834 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCServiceContext.java
@@ -24,16 +24,14 @@
import org.apache.hyracks.api.job.IJobLifecycleListener;
/**
- * Application Context at the Cluster Controller for an application.
- *
- * @author vinayakb
+ * Service Context at the Cluster Controller for an application.
*/
-public interface ICCApplicationContext extends IApplicationContext {
+public interface ICCServiceContext extends IServiceContext {
/**
- * Sets the state that must be distributed by the infrastructure to all the
- * NC application contexts. Any state set by calling this method in the {@link ICCApplicationEntryPoint#start(ICCApplicationContext, String[])} call is made available to all the {@link INCApplicationContext} objects
- * at each Node Controller. The state is then available to be inspected by
- * the application at the NC during or after the {@link INCBootstrap#start()} call.
+ * Sets the state that must be distributed by the infrastructure
+ * to all the {@link org.apache.hyracks.api.application.INCServiceContext}.
+ * Any state set by calling this method in the {@link ICCApplication#start(ICCServiceContext, String[])} call
+ * is made available to all the {@link INCServiceContext} objects at each Node Controller.
*
* @param state
* The distributed state
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
similarity index 66%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index a92cd4a..44b1eb5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -18,22 +18,9 @@
*/
package org.apache.hyracks.api.application;
-import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.kohsuke.args4j.OptionHandlerFilter;
-public interface INCApplicationEntryPoint {
- void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
-
- void notifyStartupComplete() throws Exception;
-
- void stop() throws Exception;
+public interface INCApplication extends IApplication {
NodeCapacity getCapacity();
-
- void registerConfigOptions(IConfigManager configManager);
-
- default OptionHandlerFilter getUsageFilter() {
- return OptionHandlerFilter.PUBLIC;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCServiceContext.java
similarity index 64%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCServiceContext.java
index ae25f88..d72cc01 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCServiceContext.java
@@ -24,65 +24,48 @@
import org.apache.hyracks.api.resources.memory.IMemoryManager;
/**
- * Application Context at the Node Controller for an application.
- *
- * @author vinayakb
+ * Service Context at the Node Controller for an application.
*/
-public interface INCApplicationContext extends IApplicationContext {
+public interface INCServiceContext extends IServiceContext {
/**
* Gets the life cycle component manager of the Node Controller.
*
* @return
*/
- public ILifeCycleComponentManager getLifeCycleComponentManager();
+ ILifeCycleComponentManager getLifeCycleComponentManager();
/**
* Gets the node Id of the Node Controller.
*
* @return the Node Id.
*/
- public String getNodeId();
+ String getNodeId();
/**
* @return the IO Manager
*/
- public IIOManager getIoManager();
-
- /**
- * Set an object that can be later retrieved by the {@link #getApplicationObject()} call.
- *
- * @param object
- * Application Object
- */
- public void setApplicationObject(Object object);
-
- /**
- * Get the application object previously set by the {@link #setApplicationObject(Object)} call.
- *
- * @return Application Object
- */
- public Object getApplicationObject();
+ IIOManager getIoManager();
/**
* Get the memory manager at the node.
*
* @return Memory Manager
*/
- public IMemoryManager getMemoryManager();
+ IMemoryManager getMemoryManager();
/**
* Set the handler for state dumps.
*
* @param handler
*/
- public void setStateDumpHandler(IStateDumpHandler handler);
+ void setStateDumpHandler(IStateDumpHandler handler);
/**
* Set the application MessagingChannelInterfaceFactory
*
* @param interfaceFactory
*/
- public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory);
+ void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory);
/**
* Get the application MessagingChannelInterfaceFactory previously set by
@@ -90,5 +73,5 @@
*
* @return
*/
- public IChannelInterfaceFactory getMessagingChannelInterfaceFactory();
+ IChannelInterfaceFactory getMessagingChannelInterfaceFactory();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
similarity index 63%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
index c933d9d..bc3d7a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
@@ -27,34 +27,32 @@
import org.apache.hyracks.api.service.IControllerService;
/**
- * Base class of the {@link ICCApplicationContext} and the {@link INCApplicationContext}.
- *
- * @author vinayakb
+ * Base interface of the {@link ICCServiceContext} and the {@link INCServiceContext}.
*/
-public interface IApplicationContext {
+public interface IServiceContext {
/**
- * Gets the distributed state that is made available to all the Application
- * Contexts of this application in the cluster.
- *
- * @return
+ * @return the distributed state that is made available to all the Application
+ * Contexts of this application in the cluster.
*/
- public Serializable getDistributedState();
+ Serializable getDistributedState();
- public void setMessageBroker(IMessageBroker messageBroker);
+ void setMessageBroker(IMessageBroker messageBroker);
- public IMessageBroker getMessageBroker();
+ IMessageBroker getMessageBroker();
- public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer();
+ IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer();
- public ThreadFactory getThreadFactory();
+ ThreadFactory getThreadFactory();
- public void setThreadFactory(ThreadFactory threadFactory);
+ void setThreadFactory(ThreadFactory threadFactory);
- public IApplicationConfig getAppConfig();
+ IApplicationConfig getAppConfig();
/**
* @return The controller service which the application context belongs to.
*/
- public IControllerService getControllerService();
+ IControllerService getControllerService();
+
+ Object getApplicationContext();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobActivityGraphBuilder.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobActivityGraphBuilder.java
index da344aa..1c2b9b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobActivityGraphBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobActivityGraphBuilder.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.api.client.impl;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,7 +28,6 @@
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IActivity;
@@ -53,12 +51,12 @@
private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
- public JobActivityGraphBuilder(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
- activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
+ public JobActivityGraphBuilder(JobSpecification jobSpec, Set<JobFlag> jobFlags) {
+ activityOperatorMap = new HashMap<>();
jag = new JobActivityGraph();
this.jobSpec = jobSpec;
- connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
- connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+ connectorProducerMap = new HashMap<>();
+ connectorConsumerMap = new HashMap<>();
}
public void addConnector(IConnectorDescriptor conn) {
@@ -116,7 +114,7 @@
private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
Set<V> targets = map.get(n1);
if (targets == null) {
- targets = new HashSet<V>();
+ targets = new HashSet<>();
map.put(n1, targets);
}
targets.add(n2);
@@ -132,7 +130,7 @@
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
- vList = new ArrayList<V>();
+ vList = new ArrayList<>();
map.put(key, vList);
}
extend(vList, index);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 9a908ab..61b4b4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -18,11 +18,10 @@
*/
package org.apache.hyracks.api.client.impl;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.IConstraintAcceptor;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -53,7 +52,7 @@
@Override
public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
- final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
+ final ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException {
final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
@Override
@@ -77,7 +76,7 @@
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
- final Set<Constraint> constraints = new HashSet<Constraint>();
+ final Set<Constraint> constraints = new HashSet<>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@Override
public void addConstraint(Constraint constraint) {
@@ -87,14 +86,14 @@
PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
@Override
public void visit(IOperatorDescriptor op) {
- op.contributeSchedulingConstraints(acceptor, ccAppCtx);
+ op.contributeSchedulingConstraints(acceptor, ccServiceCtx);
}
});
PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
@Override
public void visit(IConnectorDescriptor conn) {
conn.contributeSchedulingConstraints(acceptor, acg.getConnectorMap().get(conn.getConnectorId()),
- ccAppCtx);
+ ccServiceCtx);
}
});
constraints.addAll(spec.getUserConstraints());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
index 2ea657a..6eb0adb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.api.context;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.JobId;
@@ -26,15 +26,15 @@
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
- public INCApplicationContext getApplicationContext();
+ INCServiceContext getServiceContext();
- public JobId getJobId();
+ JobId getJobId();
- public ICounterContext getCounterContext();
+ ICounterContext getCounterContext();
- public Object getGlobalJobData();
+ Object getGlobalJobData();
- public Class<?> loadClass(String className) throws HyracksException;
+ Class<?> loadClass(String className) throws HyracksException;
- public ClassLoader getClassLoader() throws HyracksException;
+ ClassLoader getClassLoader() throws HyracksException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
index abff2f7..36cd6a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -22,7 +22,7 @@
import java.util.BitSet;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
@@ -98,7 +98,7 @@
* - Activity Cluster
*/
public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
- ICCApplicationContext appCtx);
+ ICCServiceContext ccServiceCtx);
/**
* Indicate which consumer partitions may receive data from the given
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index b02c4f2..9148d6b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -22,7 +22,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.constraints.IConstraintAcceptor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -84,7 +84,7 @@
* @param plan
* - Job Plan
*/
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx);
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCServiceContext ccServiceCtx);
/**
* Gets the display name.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 7d02155..133e342 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -19,14 +19,14 @@
package org.apache.hyracks.api.job;
import java.io.Serializable;
-import java.util.EnumSet;
+import java.util.Set;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksException;
public interface IActivityClusterGraphGeneratorFactory extends Serializable {
public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
- ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
+ ICCServiceContext ccServiceCtx, Set<JobFlag> jobFlags) throws HyracksException;
public JobSpecification getJobSpecification();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
index 2e742ac..aecc643 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
@@ -18,8 +18,14 @@
*/
package org.apache.hyracks.api.service;
-public interface IControllerService {
- public void start() throws Exception;
+import org.apache.hyracks.api.application.IServiceContext;
- public void stop() throws Exception;
+public interface IControllerService {
+ void start() throws Exception;
+
+ void stop() throws Exception;
+
+ IServiceContext getContext();
+
+ Object getApplicationContext();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
similarity index 77%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index 07008df..ccdd493 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -20,8 +20,9 @@
import java.util.Arrays;
-import org.apache.hyracks.api.application.ICCApplicationContext;
-import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.application.ICCApplication;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
@@ -30,14 +31,14 @@
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
-public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
- public static final ICCApplicationEntryPoint INSTANCE = new CCApplicationEntryPoint();
+public class BaseCCApplication implements ICCApplication {
+ public static final ICCApplication INSTANCE = new BaseCCApplication();
- protected CCApplicationEntryPoint() {
+ protected BaseCCApplication() {
}
@Override
- public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+ public void start(IServiceContext serviceCtx, String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
@@ -65,4 +66,14 @@
configManager.setUsageFilter(getUsageFilter());
configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
}
+
+ @Override
+ public Object getApplicationContext() {
+ return null;
+ }
+
+ @Override
+ public IHyracksClientConnection getHcc() throws Exception {
+ return null;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index 754deac..a78f6bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -25,7 +25,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.config.ConfigUtils;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -40,10 +40,10 @@
public static void main(String[] args) throws Exception {
try {
final ConfigManager configManager = new ConfigManager(args);
- ICCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
- appEntryPoint.registerConfig(configManager);
+ ICCApplication application = getApplication(args);
+ application.registerConfig(configManager);
CCConfig ccConfig = new CCConfig(configManager);
- ClusterControllerService ccService = new ClusterControllerService(ccConfig, appEntryPoint);
+ ClusterControllerService ccService = new ClusterControllerService(ccConfig, application);
ccService.start();
while (true) {
Thread.sleep(100000);
@@ -52,16 +52,16 @@
LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e);
System.exit(2);
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e);
+ LOGGER.log(Level.SEVERE, "Exiting CCDriver due to exception", e);
System.exit(1);
}
}
- private static ICCApplicationEntryPoint getAppEntryPoint(String[] args)
+ private static ICCApplication getApplication(String[] args)
throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
// determine app class so that we can use the correct implementation of the configuration...
String appClassName = ConfigUtils.getOptionValue(args, APP_CLASS);
- return appClassName != null ? (ICCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
- : CCApplicationEntryPoint.INSTANCE;
+ return appClassName != null ? (ICCApplication) (Class.forName(appClassName)).newInstance()
+ : BaseCCApplication.INSTANCE;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 77404b5..5abc2e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -41,7 +41,7 @@
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -53,7 +53,7 @@
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.cluster.NodeManager;
import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
@@ -102,7 +102,7 @@
private ClusterControllerInfo info;
- private CCApplicationContext appCtx;
+ private CCServiceContext serviceCtx;
private final PreDistributedJobStore preDistributedJobStore = new PreDistributedJobStore();
@@ -132,20 +132,19 @@
private ShutdownRun shutdownCallback;
- private final ICCApplicationEntryPoint aep;
+ private final ICCApplication application;
public ClusterControllerService(final CCConfig config) throws Exception {
- this(config, getApplicationEntryPoint(config));
+ this(config, getApplication(config));
}
- public ClusterControllerService(final CCConfig config,
- final ICCApplicationEntryPoint aep) throws Exception {
+ public ClusterControllerService(final CCConfig config, final ICCApplication application) throws Exception {
this.ccConfig = config;
this.configManager = ccConfig.getConfigManager();
- if (aep == null) {
- throw new IllegalArgumentException("ICCApplicationEntryPoint cannot be null");
+ if (application == null) {
+ throw new IllegalArgumentException("ICCApplication cannot be null");
}
- this.aep = aep;
+ this.application = application;
configManager.processConfig();
File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs");
jobLog = new LogFile(jobLogFolder);
@@ -209,11 +208,11 @@
}
private void startApplication() throws Exception {
- appCtx = new CCApplicationContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
- appCtx.addJobLifecycleListener(datasetDirectoryService);
- executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
- aep.start(appCtx, ccConfig.getAppArgsArray());
- IJobCapacityController jobCapacityController = aep.getJobCapacityController();
+ serviceCtx = new CCServiceContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
+ serviceCtx.addJobLifecycleListener(datasetDirectoryService);
+ executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+ application.start(serviceCtx, ccConfig.getAppArgsArray());
+ IJobCapacityController jobCapacityController = application.getJobCapacityController();
// Job manager is in charge of job lifecycle management.
try {
@@ -265,7 +264,7 @@
}
private void notifyApplication() throws Exception {
- aep.startupCompleted();
+ application.startupCompleted();
}
public void stop(boolean terminateNCService) throws Exception {
@@ -290,7 +289,7 @@
}
private void stopApplication() throws Exception {
- aep.stop();
+ application.stop();
}
public ServerContext getServerContext() {
@@ -337,8 +336,9 @@
return ccConfig;
}
- public CCApplicationContext getApplicationContext() {
- return appCtx;
+ @Override
+ public CCServiceContext getContext() {
+ return serviceCtx;
}
public ClusterControllerInfo getClusterControllerInfo() {
@@ -454,17 +454,22 @@
return threadDumpRunMap.remove(requestKey);
}
- private static ICCApplicationEntryPoint getApplicationEntryPoint(CCConfig ccConfig)
+ private static ICCApplication getApplication(CCConfig ccConfig)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if (ccConfig.getAppClass() != null) {
Class<?> c = Class.forName(ccConfig.getAppClass());
- return (ICCApplicationEntryPoint) c.newInstance();
+ return (ICCApplication) c.newInstance();
} else {
- return CCApplicationEntryPoint.INSTANCE;
+ return BaseCCApplication.INSTANCE;
}
}
- public ICCApplicationEntryPoint getApplication() {
- return aep;
+ public ICCApplication getApplication() {
+ return application;
+ }
+
+ @Override
+ public Object getApplicationContext() {
+ return application.getApplicationContext();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
similarity index 91%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index a8b03bc..5075081 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Set;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
@@ -38,12 +38,12 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.application.ApplicationContext;
+import org.apache.hyracks.control.common.application.ServiceContext;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
import org.apache.hyracks.control.common.work.IResultCallback;
-public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+public class CCServiceContext extends ServiceContext implements ICCServiceContext {
private final ICCContext ccContext;
protected final Set<String> initPendingNodeIds;
@@ -56,7 +56,7 @@
private List<IClusterLifecycleListener> clusterLifecycleListeners;
private final ClusterControllerService ccs;
- public CCApplicationContext(ClusterControllerService ccs, ServerContext serverCtx, ICCContext ccContext,
+ public CCServiceContext(ClusterControllerService ccs, ServerContext serverCtx, ICCContext ccContext,
IApplicationConfig appConfig) throws IOException {
super(serverCtx, appConfig, new HyracksThreadFactory("ClusterController"));
this.ccContext = ccContext;
@@ -122,4 +122,9 @@
public IControllerService getControllerService() {
return ccs;
}
+
+ @Override
+ public Object getApplicationContext() {
+ return ccs.getApplicationContext();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 084bd1b..2150bdd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -92,8 +92,8 @@
this.jobRun = jobRun;
this.predistributed = predistributed;
solver = new PartitionConstraintSolver();
- partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
- inProgressTaskClusters = new HashSet<TaskCluster>();
+ partitionProducingTaskClusterMap = new HashMap<>();
+ inProgressTaskClusters = new HashSet<>();
solver.addConstraints(constraints);
random = new Random();
}
@@ -112,7 +112,7 @@
public void startJob() throws HyracksException {
startRunnableActivityClusters();
- ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
+ ccs.getContext().notifyJobStart(jobRun.getJobId());
}
public void cancelJob() throws HyracksException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 649487b..45c7711 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -41,7 +41,7 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
import org.apache.hyracks.control.cc.scheduler.IJobQueue;
@@ -205,10 +205,10 @@
checkJob(run);
JobId jobId = run.getJobId();
HyracksException caughtException = null;
- CCApplicationContext appCtx = ccs.getApplicationContext();
- if (appCtx != null) {
+ CCServiceContext serviceCtx = ccs.getContext();
+ if (serviceCtx != null) {
try {
- appCtx.notifyJobFinish(jobId);
+ serviceCtx.notifyJobFinish(jobId);
} catch (HyracksException e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
caughtException = e;
@@ -301,10 +301,10 @@
JobId jobId = run.getJobId();
activeRunMap.put(jobId, run);
- CCApplicationContext appCtx = ccs.getApplicationContext();
+ CCServiceContext serviceCtx = ccs.getContext();
JobSpecification spec = run.getJobSpecification();
if (!run.getExecutor().isPredistributed()) {
- appCtx.notifyJobCreation(jobId, spec);
+ serviceCtx.notifyJobCreation(jobId, spec);
}
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 40a83a2..341834c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -21,7 +21,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -49,7 +49,7 @@
@Override
public void runWork() {
- final ICCApplicationContext ctx = ccs.getApplicationContext();
+ final ICCServiceContext ctx = ccs.getContext();
try {
final IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);
ccs.getExecutor().execute(new Runnable() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
index 6480674..c0ecffb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -65,7 +65,7 @@
/**
* Deploy for the cluster controller
*/
- DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getApplicationContext()
+ DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getContext()
.getJobSerializerDeserializerContainer(), ccs.getServerContext(), false);
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index de28c32..638c27d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -20,7 +20,6 @@
package org.apache.hyracks.control.cc.work;
import java.util.Collection;
-import java.util.Set;
import java.util.UUID;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -62,7 +61,7 @@
/**
* Deploy for the cluster controller
*/
- DeploymentUtils.undeploy(deploymentId, ccs.getApplicationContext().getJobSerializerDeserializerContainer(),
+ DeploymentUtils.undeploy(deploymentId, ccs.getContext().getJobSerializerDeserializerContainer(),
ccs.getServerContext());
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
index f0c3303..e5fd66a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -28,7 +28,7 @@
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -51,17 +51,17 @@
@Override
protected void doRun() throws Exception {
try {
- final CCApplicationContext appCtx = ccs.getApplicationContext();
+ final CCServiceContext ccServiceCtx = ccs.getContext();
ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
IActivityClusterGraphGeneratorFactory acggf =
- (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, appCtx);
+ (IActivityClusterGraphGeneratorFactory) DeploymentUtils.deserialize(acggfBytes, null, ccServiceCtx);
IActivityClusterGraphGenerator acgg =
- acggf.createActivityClusterGraphGenerator(jobId, appCtx, EnumSet.noneOf(JobFlag.class));
+ acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, EnumSet.noneOf(JobFlag.class));
ActivityClusterGraph acg = acgg.initialize();
ccs.getPreDistributedJobStore().addDistributedJobDescriptor(jobId, acg, acggf.getJobSpecification(),
acgg.getConstraints());
- appCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
+ ccServiceCtx.notifyJobCreation(jobId, acggf.getJobSpecification());
byte[] acgBytes = JavaSerializationUtils.serialize(acg);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 1253cf7..2dbb631 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
@@ -57,14 +57,14 @@
protected void doRun() throws Exception {
IJobManager jobManager = ccs.getJobManager();
try {
- final CCApplicationContext appCtx = ccs.getApplicationContext();
+ final CCServiceContext ccServiceCtx = ccs.getContext();
JobRun run;
if (!predestributed) {
//Need to create the ActivityClusterGraph
IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
- .deserialize(acggfBytes, deploymentId, appCtx);
+ .deserialize(acggfBytes, deploymentId, ccServiceCtx);
IActivityClusterGraphGenerator acgg =
- acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
+ acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags);
run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
} else {
//ActivityClusterGraph has already been distributed
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index e97950e..79033d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -65,7 +65,7 @@
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
- params.setDistributedState(ccs.getApplicationContext().getDistributedState());
+ params.setDistributedState(ccs.getContext().getDistributedState());
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
result = new CCNCFunctions.NodeRegistrationResult(params, null);
@@ -73,6 +73,6 @@
result = new CCNCFunctions.NodeRegistrationResult(null, e);
}
ncIPCHandle.send(-1, result, null);
- ccs.getApplicationContext().notifyNodeJoin(id, ncConfiguration);
+ ccs.getContext().notifyNodeJoin(id, ncConfiguration);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 410b75f..a162708 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -61,7 +61,7 @@
}
}
if (!deadNodes.isEmpty()) {
- ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+ ccs.getContext().notifyNodeFailure(deadNodes);
}
} catch (HyracksException e) {
LOGGER.log(Level.WARNING, "Uncaught exception on notifyNodeFailure", e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 1628248..ed2a740 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -42,7 +42,7 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.application.CCApplicationContext;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.cluster.NodeManager;
import org.apache.hyracks.control.common.base.INodeController;
@@ -279,10 +279,10 @@
private ClusterControllerService mockClusterControllerService() {
ClusterControllerService ccs = mock(ClusterControllerService.class);
- CCApplicationContext appCtx = mock(CCApplicationContext.class);
+ CCServiceContext ccServiceCtx = mock(CCServiceContext.class);
LogFile logFile = mock(LogFile.class);
INodeManager nodeManager = mockNodeManager();
- when(ccs.getApplicationContext()).thenReturn(appCtx);
+ when(ccs.getContext()).thenReturn(ccServiceCtx);
when(ccs.getJobLogFile()).thenReturn(logFile);
when(ccs.getNodeManager()).thenReturn(nodeManager);
when(ccs.getCCConfig()).thenReturn(ccConfig);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
similarity index 90%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
index 42bc636..1ee2315 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
@@ -22,13 +22,13 @@
import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.api.config.IApplicationConfig;
-import org.apache.hyracks.api.application.IApplicationContext;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.job.JobSerializerDeserializerContainer;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.common.context.ServerContext;
-public abstract class ApplicationContext implements IApplicationContext {
+public abstract class ServiceContext implements IServiceContext {
protected final ServerContext serverCtx;
protected final IApplicationConfig appConfig;
protected ThreadFactory threadFactory;
@@ -36,7 +36,7 @@
protected IMessageBroker messageBroker;
protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
- public ApplicationContext(ServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) {
+ public ServiceContext(ServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) {
this.serverCtx = serverCtx;
this.appConfig = appConfig;
this.threadFactory = threadFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
index e033ff9..bb65f7f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
@@ -34,7 +34,7 @@
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.hyracks.api.application.IApplicationContext;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IJobSerializerDeserializer;
@@ -111,18 +111,17 @@
* the bytes to be deserialized
* @param deploymentId
* the deployment id
- * @param appCtx
+ * @param serviceCtx
* @return the deserialized object
* @throws HyracksException
*/
- public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IApplicationContext appCtx)
+ public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IServiceContext serviceCtx)
throws HyracksException {
try {
- IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+ IJobSerializerDeserializerContainer jobSerDeContainer = serviceCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null
: jobSerDeContainer.getJobSerializerDeserializer(deploymentId);
- Object obj = jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
- return obj;
+ return jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
} catch (Exception e) {
throw new HyracksException(e);
}
@@ -133,14 +132,14 @@
*
* @param className
* @param deploymentId
- * @param appCtx
+ * @param serviceCtx
* @return the loaded class
* @throws HyracksException
*/
- public static Class<?> loadClass(String className, DeploymentId deploymentId, IApplicationContext appCtx)
+ public static Class<?> loadClass(String className, DeploymentId deploymentId, IServiceContext serviceCtx)
throws HyracksException {
try {
- IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+ IJobSerializerDeserializerContainer jobSerDeContainer = serviceCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null
: jobSerDeContainer.getJobSerializerDeserializer(deploymentId);
return jobSerDe == null ? JavaSerializationUtils.loadClass(className) : jobSerDe.loadClass(className);
@@ -157,7 +156,7 @@
* @return
* @throws HyracksException
*/
- public static ClassLoader getClassLoader(DeploymentId deploymentId, IApplicationContext appCtx)
+ public static ClassLoader getClassLoader(DeploymentId deploymentId, IServiceContext appCtx)
throws HyracksException {
IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null
@@ -185,7 +184,7 @@
while (tried < retryCount) {
try {
tried++;
- List<URL> downloadedFileURLs = new ArrayList<URL>();
+ List<URL> downloadedFileURLs = new ArrayList<>();
File dir = new File(deploymentDir);
if (!dir.exists()) {
FileUtils.forceMkdir(dir);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
similarity index 79%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index d4e67fd..869ab5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -21,8 +21,8 @@
import java.lang.management.ManagementFactory;
import java.util.Arrays;
-import org.apache.hyracks.api.application.INCApplicationContext;
-import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.application.INCApplication;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -30,21 +30,21 @@
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
-public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
- public static final NCApplicationEntryPoint INSTANCE = new NCApplicationEntryPoint();
+public class BaseNCApplication implements INCApplication {
+ public static final BaseNCApplication INSTANCE = new BaseNCApplication();
- protected NCApplicationEntryPoint() {
+ protected BaseNCApplication() {
}
@Override
- public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ public void start(IServiceContext ncAppCtx, String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
}
@Override
- public void notifyStartupComplete() throws Exception {
+ public void startupCompleted() throws Exception {
// no-op
}
@@ -60,10 +60,15 @@
}
@Override
- public void registerConfigOptions(IConfigManager configManager) {
+ public void registerConfig(IConfigManager configManager) {
configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL);
configManager.addCmdLineSections(Section.NC, Section.COMMON, Section.LOCALNC);
configManager.setUsageFilter(getUsageFilter());
configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
}
+
+ @Override
+ public Object getApplicationContext() {
+ return null;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 25ab245..46aa992 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -27,7 +27,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.PartitionChannel;
import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -65,7 +65,7 @@
private final NodeControllerService nodeController;
- private final INCApplicationContext appCtx;
+ private final INCServiceContext serviceCtx;
private final DeploymentId deploymentId;
@@ -100,9 +100,9 @@
private boolean cleanupPending;
public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
- INCApplicationContext appCtx, ActivityClusterGraph acg) {
+ INCServiceContext serviceCtx, ActivityClusterGraph acg) {
this.nodeController = nodeController;
- this.appCtx = appCtx;
+ this.serviceCtx = serviceCtx;
this.deploymentId = deploymentId;
this.jobId = jobId;
this.frameManager = new FrameManager(acg.getFrameSize());
@@ -114,7 +114,7 @@
taskMap = new HashMap<>();
counterMap = new HashMap<>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
- fileFactory = new WorkspaceFileFactory(this, appCtx.getIoManager());
+ fileFactory = new WorkspaceFileFactory(this, serviceCtx.getIoManager());
cleanupPending = false;
IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
if (jelf != null) {
@@ -197,8 +197,8 @@
}
@Override
- public INCApplicationContext getApplicationContext() {
- return appCtx;
+ public INCServiceContext getServiceContext() {
+ return serviceCtx;
}
@Override
@@ -215,7 +215,7 @@
long stillAllocated = memoryAllocation.get();
if (stillAllocated > 0) {
LOGGER.warning("Freeing leaked " + stillAllocated + " bytes");
- appCtx.getMemoryManager().deallocate(stillAllocated);
+ serviceCtx.getMemoryManager().deallocate(stillAllocated);
}
nodeController.getExecutorService().execute(new Runnable() {
@Override
@@ -230,7 +230,7 @@
}
ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
- if (appCtx.getMemoryManager().allocate(bytes)) {
+ if (serviceCtx.getMemoryManager().allocate(bytes)) {
memoryAllocation.addAndGet(bytes);
return frameManager.allocateFrame(bytes);
}
@@ -244,7 +244,7 @@
void deallocateFrames(int bytes) {
memoryAllocation.addAndGet(bytes);
- appCtx.getMemoryManager().deallocate(bytes);
+ serviceCtx.getMemoryManager().deallocate(bytes);
frameManager.deallocateFrames(bytes);
}
@@ -253,7 +253,7 @@
}
public IIOManager getIOManager() {
- return appCtx.getIoManager();
+ return serviceCtx.getIoManager();
}
@Override
@@ -326,11 +326,11 @@
@Override
public Class<?> loadClass(String className) throws HyracksException {
- return DeploymentUtils.loadClass(className, deploymentId, appCtx);
+ return DeploymentUtils.loadClass(className, deploymentId, serviceCtx);
}
@Override
public ClassLoader getClassLoader() throws HyracksException {
- return DeploymentUtils.getClassLoader(deploymentId, appCtx);
+ return DeploymentUtils.getClassLoader(deploymentId, serviceCtx);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index b52064e..d36586f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -23,7 +23,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.config.ConfigUtils;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -40,10 +40,10 @@
try {
final String nodeId = ConfigUtils.getOptionValue(args, NCConfig.Option.NODE_ID);
final ConfigManager configManager = new ConfigManager(args);
- INCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
- appEntryPoint.registerConfigOptions(configManager);
+ INCApplication application = getApplication(args);
+ application.registerConfig(configManager);
NCConfig ncConfig = new NCConfig(nodeId, configManager);
- final NodeControllerService ncService = new NodeControllerService(ncConfig, appEntryPoint);
+ final NodeControllerService ncService = new NodeControllerService(ncConfig, application);
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Setting uncaught exception handler " + ncService.getLifeCycleComponentManager());
}
@@ -62,11 +62,11 @@
}
}
- private static INCApplicationEntryPoint getAppEntryPoint(String[] args)
+ private static INCApplication getApplication(String[] args)
throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
// determine app class so that we can use the correct implementation of the configuration...
String appClassName = ConfigUtils.getOptionValue(args, NCConfig.Option.APP_CLASS);
- return appClassName != null ? (INCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
- : NCApplicationEntryPoint.INSTANCE;
+ return appClassName != null ? (INCApplication) (Class.forName(appClassName)).newInstance()
+ : BaseNCApplication.INSTANCE;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2ee9161..b893d26 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -42,7 +42,7 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -69,7 +69,7 @@
import org.apache.hyracks.control.common.utils.PidHelper;
import org.apache.hyracks.control.common.work.FutureValue;
import org.apache.hyracks.control.common.work.WorkQueue;
-import org.apache.hyracks.control.nc.application.NCApplicationContext;
+import org.apache.hyracks.control.nc.application.NCServiceContext;
import org.apache.hyracks.control.nc.dataset.DatasetPartitionManager;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
@@ -130,9 +130,9 @@
private final ServerContext serverCtx;
- private NCApplicationContext appCtx;
+ private NCServiceContext serviceCtx;
- private final INCApplicationEntryPoint ncAppEntryPoint;
+ private final INCApplication application;
private final ILifeCycleComponentManager lccm;
@@ -159,17 +159,17 @@
private final ConfigManager configManager;
public NodeControllerService(NCConfig config) throws Exception {
- this(config, getApplicationEntryPoint(config));
+ this(config, getApplication(config));
}
- public NodeControllerService(NCConfig config, INCApplicationEntryPoint aep) throws IOException, CmdLineException {
+ public NodeControllerService(NCConfig config, INCApplication application) throws IOException, CmdLineException {
this.ncConfig = config;
this.configManager = ncConfig.getConfigManager();
- if (aep == null) {
- throw new IllegalArgumentException("INCApplicationEntryPoint cannot be null");
+ if (application == null) {
+ throw new IllegalArgumentException("INCApplication cannot be null");
}
configManager.processConfig();
- this.ncAppEntryPoint = aep;
+ this.application = application;
id = ncConfig.getNodeId();
ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()));
@@ -199,8 +199,9 @@
return ioManager;
}
- public NCApplicationContext getApplicationContext() {
- return appCtx;
+ @Override
+ public NCServiceContext getContext() {
+ return serviceCtx;
}
public ILifeCycleComponentManager getLifeCycleComponentManager() {
@@ -244,11 +245,11 @@
ncConfig.getResultListenPort(), datasetPartitionManager, ncConfig.getNetThreadCount(),
ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
FullFrameChannelInterfaceFactory.INSTANCE);
- if (ncConfig.getMessagingListenAddress() != null && appCtx.getMessagingChannelInterfaceFactory() != null) {
+ if (ncConfig.getMessagingListenAddress() != null && serviceCtx.getMessagingChannelInterfaceFactory() != null) {
messagingNetManager = new MessagingNetworkManager(this, ncConfig.getMessagingListenAddress(),
ncConfig.getMessagingListenPort(), ncConfig.getNetThreadCount(),
ncConfig.getMessagingPublicAddress(), ncConfig.getMessagingPublicPort(),
- appCtx.getMessagingChannelInterfaceFactory());
+ serviceCtx.getMessagingChannelInterfaceFactory());
}
}
@@ -292,7 +293,7 @@
runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
- ncAppEntryPoint.getCapacity(), PidHelper.getPid()));
+ application.getCapacity(), PidHelper.getPid()));
synchronized (this) {
while (registrationPending) {
@@ -302,7 +303,7 @@
if (registrationException != null) {
throw registrationException;
}
- appCtx.setDistributedState(nodeParameters.getDistributedState());
+ serviceCtx.setDistributedState(nodeParameters.getDistributedState());
workQueue.start();
@@ -322,13 +323,13 @@
}
LOGGER.log(Level.INFO, "Started NodeControllerService");
- ncAppEntryPoint.notifyStartupComplete();
+ application.startupCompleted();
}
private void startApplication() throws Exception {
- appCtx = new NCApplicationContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig());
- ncAppEntryPoint.start(appCtx, ncConfig.getAppArgsArray());
- executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
+ serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig());
+ application.start(serviceCtx, ncConfig.getAppArgsArray());
+ executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
}
@Override
@@ -347,7 +348,7 @@
messagingNetManager.stop();
}
workQueue.stop();
- ncAppEntryPoint.stop();
+ application.stop();
/*
* Stop heartbeat after NC has stopped to avoid false node failure detection
* on CC if an NC takes a long time to stop.
@@ -530,13 +531,18 @@
return messagingNetManager;
}
- private static INCApplicationEntryPoint getApplicationEntryPoint(NCConfig config)
+ private static INCApplication getApplication(NCConfig config)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if (config.getAppClass() != null) {
Class<?> c = Class.forName(config.getAppClass());
- return (INCApplicationEntryPoint) c.newInstance();
+ return (INCApplication) c.newInstance();
} else {
- return NCApplicationEntryPoint.INSTANCE;
+ return BaseNCApplication.INSTANCE;
}
}
+
+ @Override
+ public Object getApplicationContext() {
+ return application.getApplicationContext();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
similarity index 86%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index 6d549c2..dc0bf0c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -22,31 +22,30 @@
import java.io.OutputStream;
import java.io.Serializable;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.resources.memory.IMemoryManager;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.common.application.ApplicationContext;
+import org.apache.hyracks.control.common.application.ServiceContext;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
-public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+public class NCServiceContext extends ServiceContext implements INCServiceContext {
private final ILifeCycleComponentManager lccm;
private final String nodeId;
private final IOManager ioManager;
private final MemoryManager memoryManager;
- private Object appObject;
private IStateDumpHandler sdh;
private final NodeControllerService ncs;
private IChannelInterfaceFactory messagingChannelInterfaceFactory;
- public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IOManager ioManager,
+ public NCServiceContext(NodeControllerService ncs, ServerContext serverCtx, IOManager ioManager,
String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
IApplicationConfig appConfig) throws IOException {
super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
@@ -93,16 +92,6 @@
}
@Override
- public void setApplicationObject(Object object) {
- this.appObject = object;
- }
-
- @Override
- public Object getApplicationObject() {
- return appObject;
- }
-
- @Override
public IMemoryManager getMemoryManager() {
return memoryManager;
}
@@ -121,4 +110,9 @@
public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory) {
this.messagingChannelInterfaceFactory = interfaceFactory;
}
+
+ @Override
+ public Object getApplicationContext() {
+ return ncs.getApplicationContext();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index 68d9223..5ebb99a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -45,7 +45,7 @@
}
try {
ncs.getClusterController().notifyThreadDump(
- ncs.getApplicationContext().getNodeId(), requestId, result);
+ ncs.getContext().getNodeId(), requestId, result);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception sending thread dump to CC", e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index e589ece..7f5302a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.hyracks.control.nc.application.NCApplicationContext;
+import org.apache.hyracks.control.nc.application.NCServiceContext;
/**
* @author rico
@@ -47,7 +47,7 @@
@Override
public void run() {
- NCApplicationContext ctx = ncs.getApplicationContext();
+ NCServiceContext ctx = ncs.getContext();
try {
IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);;
if (ctx.getMessageBroker() != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
index 264a131..0fe55e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
@@ -51,7 +51,7 @@
public void run() {
DeploymentStatus status;
try {
- DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getApplicationContext()
+ DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getContext()
.getJobSerializerDeserializerContainer(), ncs.getServerContext(), true);
status = DeploymentStatus.SUCCEED;
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
index 3a4f6ac..807142b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -47,7 +47,7 @@
try {
ncs.checkForDuplicateDistributedJob(jobId);
ActivityClusterGraph acg =
- (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getApplicationContext());
+ (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
ncs.storeActivityClusterGraph(jobId, acg);
} catch (HyracksException e) {
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 6cd9fa2..692eac6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -28,7 +28,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
@@ -62,7 +62,7 @@
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
-import org.apache.hyracks.control.nc.application.NCApplicationContext;
+import org.apache.hyracks.control.nc.application.NCServiceContext;
import org.apache.hyracks.control.nc.partitions.MaterializedPartitionWriter;
import org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
import org.apache.hyracks.control.nc.partitions.PipelinedPartition;
@@ -102,8 +102,8 @@
public void run() {
Task task = null;
try {
- NCApplicationContext appCtx = ncs.getApplicationContext();
- Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes);
+ NCServiceContext serviceCtx = ncs.getContext();
+ Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, serviceCtx, acgBytes);
final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
@@ -182,7 +182,7 @@
}
}
- private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx,
+ private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCServiceContext appCtx,
byte[] acgBytes) throws HyracksException {
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
@@ -265,10 +265,10 @@
private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td,
List<IConnectorDescriptor> inputs) throws UnknownHostException {
NetworkAddress[][] inputAddresses = td.getInputPartitionLocations();
- List<List<PartitionChannel>> channelsForInputConnectors = new ArrayList<List<PartitionChannel>>();
+ List<List<PartitionChannel>> channelsForInputConnectors = new ArrayList<>();
if (inputAddresses != null) {
for (int i = 0; i < inputAddresses.length; i++) {
- List<PartitionChannel> channels = new ArrayList<PartitionChannel>();
+ List<PartitionChannel> channels = new ArrayList<>();
if (inputAddresses[i] != null) {
for (int j = 0; j < inputAddresses[i].length; j++) {
NetworkAddress networkAddress = inputAddresses[i][j];
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
index e3f03f7..9dbc901 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
@@ -36,8 +36,7 @@
@Override
protected void doRun() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ncs.getApplicationContext().getStateDumpHandler().dumpState(baos);
- ncs.getClusterController().notifyStateDump(ncs.getApplicationContext().getNodeId(), stateDumpId,
- baos.toString("UTF-8"));
+ ncs.getContext().getStateDumpHandler().dumpState(baos);
+ ncs.getClusterController().notifyStateDump(ncs.getContext().getNodeId(), stateDumpId, baos.toString("UTF-8"));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
index f564ff4..1c10589 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
@@ -45,7 +45,7 @@
public void run() {
DeploymentStatus status;
try {
- DeploymentUtils.undeploy(deploymentId, ncs.getApplicationContext().getJobSerializerDeserializerContainer(),
+ DeploymentUtils.undeploy(deploymentId, ncs.getContext().getJobSerializerDeserializerContainer(),
ncs.getServerContext());
status = DeploymentStatus.SUCCEED;
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 6b8b38f..705244a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -21,7 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.constraints.IConstraintAcceptor;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -68,7 +68,7 @@
@Override
public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
- ICCApplicationContext appCtx) {
+ ICCServiceContext serviceCtx) {
// do nothing
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index a18328e..155aa03 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -20,7 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.constraints.IConstraintAcceptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -86,7 +86,7 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCServiceContext serviceCtx) {
// do nothing
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index f773918..eda353b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -20,7 +20,7 @@
import java.util.BitSet;
-import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
@@ -65,7 +65,7 @@
@Override
public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
- ICCApplicationContext appCtx) {
+ ICCServiceContext serviceCtx) {
OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index f055c70..269516a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -21,7 +21,7 @@
import java.util.concurrent.ThreadFactory;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
@@ -55,7 +55,7 @@
}
};
- public RuntimeContext(INCApplicationContext appCtx) throws HyracksDataException {
+ public RuntimeContext(INCServiceContext appCtx) throws HyracksDataException {
fileMapManager = new TransientFileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50);
@@ -80,7 +80,7 @@
}
public static RuntimeContext get(IHyracksTaskContext ctx) {
- return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ return (RuntimeContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
}
public ILocalResourceRepository getLocalResourceRepository() {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/NCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
similarity index 65%
rename from hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/NCApplicationEntryPoint.java
rename to hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index eec28a2..e99e3b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/NCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -18,27 +18,29 @@
*/
package org.apache.hyracks.examples.btree.helper;
-import org.apache.hyracks.api.application.INCApplicationContext;
-import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.application.INCApplication;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.NodeCapacity;
-public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
+public class TestNCApplication implements INCApplication {
+
+ private RuntimeContext rCtx;
@Override
- public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
- RuntimeContext rCtx = new RuntimeContext(ncAppCtx);
- ncAppCtx.setApplicationObject(rCtx);
+ public void start(IServiceContext serviceCtx, String[] args) throws Exception {
+ rCtx = new RuntimeContext((INCServiceContext) serviceCtx);
}
@Override
- public void notifyStartupComplete() throws Exception {
-
+ public void startupCompleted() throws Exception {
+ // No-op
}
@Override
public void stop() throws Exception {
-
+ // No-op
}
@Override
@@ -47,8 +49,13 @@
}
@Override
- public void registerConfigOptions(IConfigManager configManager) {
+ public void registerConfig(IConfigManager configManager) {
// no-op
}
+ @Override
+ public RuntimeContext getApplicationContext() {
+ return rCtx;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index f7959d8..148d4f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -42,7 +42,7 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.client.dataset.HyracksDataset;
-import org.apache.hyracks.control.cc.CCApplicationEntryPoint;
+import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -91,7 +91,7 @@
ccRoot.delete();
ccRoot.mkdir();
ccConfig.setRootDir(ccRoot.getAbsolutePath());
- ccConfig.setAppClass(DummyApplicationEntryPoint.class.getName());
+ ccConfig.setAppClass(DummyApplication.class.getName());
cc = new ClusterControllerService(ccConfig);
cc.start();
@@ -216,7 +216,7 @@
return tempFile;
}
- public static class DummyApplicationEntryPoint extends CCApplicationEntryPoint {
+ public static class DummyApplication extends BaseCCApplication {
@Override
public IJobCapacityController getJobCapacityController() {
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 7b80157..78c7c6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -94,7 +94,7 @@
final InputSplit[] inputSplits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ private String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
@SuppressWarnings("unchecked")
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 7424b7d..cd55098 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -107,7 +107,7 @@
final List<FileSplit> inputSplits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ private String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
private ContextFactory ctxFactory = new ContextFactory();
@SuppressWarnings("unchecked")
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
index bf7f2a0..0d0cd3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
@@ -24,9 +24,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -37,7 +34,6 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
@@ -58,9 +54,12 @@
import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
import org.apache.hyracks.hdfs.utils.HyracksUtils;
-import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.file.FileUtil;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
/**
* Test the org.apache.hyracks.hdfs.dataflow package,
* the operators for the Hadoop old API.
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 6e248b5..445a15c 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -31,7 +31,7 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
-import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.apache.hyracks.test.support.TestUtils;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
deleted file mode 100644
index d522b2b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.hdfs.utils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.NodeStatus;
-import org.apache.hyracks.api.comm.NetworkAddress;
-
-public class TestUtils extends org.apache.hyracks.test.support.TestUtils {
-
- public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
- BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
- BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
- String lineExpected, lineActual;
- int num = 1;
- try {
- while ((lineExpected = readerExpected.readLine()) != null) {
- lineActual = readerActual.readLine();
- // Assert.assertEquals(lineExpected, lineActual);
- if (lineActual == null) {
- throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
- }
- if (!equalStrings(lineExpected, lineActual)) {
- throw new Exception(
- "Result for changed at line " + num + ":\n< " + lineExpected + "\n> " + lineActual);
- }
- ++num;
- }
- lineActual = readerActual.readLine();
- if (lineActual != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
- }
- } finally {
- readerExpected.close();
- readerActual.close();
- }
- }
-
- private static boolean equalStrings(String s1, String s2) {
- String[] rowsOne = s1.split("\n");
- String[] rowsTwo = s2.split("\n");
-
- if (rowsOne.length != rowsTwo.length)
- return false;
-
- for (int i = 0; i < rowsOne.length; i++) {
- String row1 = rowsOne[i];
- String row2 = rowsTwo[i];
-
- if (row1.equals(row2))
- continue;
-
- String[] fields1 = row1.split(",");
- String[] fields2 = row2.split(",");
-
- for (int j = 0; j < fields1.length; j++) {
- if (fields1[j].equals(fields2[j])) {
- continue;
- } else if (fields1[j].indexOf('.') < 0) {
- return false;
- } else {
- fields1[j] = fields1[j].split("=")[1];
- fields2[j] = fields2[j].split("=")[1];
- Double double1 = Double.parseDouble(fields1[j]);
- Double double2 = Double.parseDouble(fields2[j]);
- float float1 = (float) double1.doubleValue();
- float float2 = (float) double2.doubleValue();
-
- if (Math.abs(float1 - float2) == 0)
- continue;
- else {
- return false;
- }
- }
- }
- }
- return true;
- }
-
- public static Map<String, NodeControllerInfo> generateNodeControllerInfo(int numberOfNodes, String ncNamePrefix,
- String addressPrefix, int netPort, int dataPort, int messagingPort) {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- for (int i = 1; i <= numberOfNodes; i++) {
- String ncId = ncNamePrefix + i;
- String ncAddress = addressPrefix + i;
- ncNameToNcInfos.put(ncId,
- new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort),
- new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort), 2));
- }
- return ncNameToNcInfos;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
index 793e029..4d970ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -27,7 +27,7 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.apache.hyracks.test.support.TestUtils;
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -46,7 +46,7 @@
Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
5098, 5097);
- List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+ List<InputSplit> fileSplits = new ArrayList<>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" }));
@@ -73,7 +73,7 @@
Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
5098, 5097);
- List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+ List<InputSplit> fileSplits = new ArrayList<>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" }));
@@ -107,7 +107,7 @@
Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
5098, 5097);
- List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+ List<InputSplit> fileSplits = new ArrayList<>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.3" }));
@@ -141,7 +141,7 @@
Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
5098, 5097);
- List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+ List<InputSplit> fileSplits = new ArrayList<>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
fileSplits.add(new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" }));
fileSplits.add(new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.3" }));
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
index d471956..ab544e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
@@ -71,7 +71,7 @@
.getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc
.getTreeIndexComparatorFactories(), treeOpDesc.getTreeIndexBloomFilterKeyFields(),
bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory.getOperationTracker(ctx.getJobletContext()
- .getApplicationContext()), ioScheduler,
+ .getServiceContext()), ioScheduler,
ioOpCallbackFactory.createIoOpCallback(), getVersion(), durable,
(IMetadataPageManagerFactory) opDesc.getPageManagerFactory());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
index 1e60690..4e3c022 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
@@ -82,7 +82,7 @@
opDesc.getStorageManager().getBufferCache(ctx),
opDesc.getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(),
treeOpDesc.getTreeIndexComparatorFactories(), bloomFilterFalsePositiveRate, mergePolicy,
- opTrackerFactory.getOperationTracker(ctx.getJobletContext().getApplicationContext()), ioScheduler,
+ opTrackerFactory.getOperationTracker(ctx.getJobletContext().getServiceContext()), ioScheduler,
ioOpCallbackFactory.createIoOpCallback(), buddyBtreeFields, version, durable,
(IMetadataPageManagerFactory) opDesc.getPageManagerFactory());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index 0346624..81d8457 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -77,7 +77,7 @@
opDesc.getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(),
treeOpDesc.getTreeIndexComparatorFactories(), treeOpDesc.getTreeIndexBloomFilterKeyFields(),
bloomFilterFalsePositiveRate, mergePolicy, opTrackerFactory.getOperationTracker(ctx.getJobletContext()
- .getApplicationContext()), ioScheduler,
+ .getServiceContext()), ioScheduler,
ioOpCallbackFactory.createIoOpCallback(), needKeyDupCheck, filterTypeTraits, filterCmpFactories,
btreeFields, filterFields, durable, (IMetadataPageManagerFactory) opDesc.getPageManagerFactory());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
index 133d450..217f794 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
@@ -20,9 +20,9 @@
import java.io.Serializable;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
@FunctionalInterface
public interface ILSMOperationTrackerFactory extends Serializable {
- ILSMOperationTracker getOperationTracker(INCApplicationContext ctx);
+ ILSMOperationTracker getOperationTracker(INCServiceContext ctx);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
index 97e6978..c737ceb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -43,7 +43,7 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCApplicationContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
return tracker;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
index a9ccc42..d01e7ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -32,7 +32,7 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCApplicationContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
return new ThreadCountingTracker();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java
index ac06d1c..fe5e94e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelper.java
@@ -89,7 +89,7 @@
invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(),
invIndexOpDesc.getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(),
diskBufferCache, fileRef.getFile().getAbsolutePath(), bloomFilterFalsePositiveRate, mergePolicy,
- opTrackerFactory.getOperationTracker(ctx.getJobletContext().getApplicationContext()), ioScheduler,
+ opTrackerFactory.getOperationTracker(ctx.getJobletContext().getServiceContext()), ioScheduler,
ioOpCallbackFactory.createIoOpCallback(), invertedIndexFields, filterTypeTraits,
filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
invertedIndexFieldsForNonBulkLoadOps, durable, (IMetadataPageManagerFactory) opDesc
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
index 4ee3a21..34c5810 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
@@ -88,7 +88,7 @@
invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(),
invIndexOpDesc.getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(),
diskBufferCache, fileRef.getFile().getAbsolutePath(), bloomFilterFalsePositiveRate, mergePolicy,
- opTrackerFactory.getOperationTracker(ctx.getJobletContext().getApplicationContext()), ioScheduler,
+ opTrackerFactory.getOperationTracker(ctx.getJobletContext().getServiceContext()), ioScheduler,
ioOpCallbackFactory.createIoOpCallback(), invertedIndexFields, filterTypeTraits,
filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
invertedIndexFieldsForNonBulkLoadOps, durable, opDesc.getPageManagerFactory());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
index 00a2e3f..e5c837e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
@@ -93,7 +93,7 @@
return createLSMTree(virtualBufferCaches, fileRef, opDesc.getStorageManager().getBufferCache(ctx),
opDesc.getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(),
treeOpDesc.getTreeIndexComparatorFactories(), btreeComparatorFactories,
- opTrackerFactory.getOperationTracker(ctx.getJobletContext().getApplicationContext()),
+ opTrackerFactory.getOperationTracker(ctx.getJobletContext().getServiceContext()),
valueProviderFactories, rtreePolicyType, linearizeCmpFactory,
rtreeFields, filterTypeTraits, filterCmpFactories, filterFields);
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index 7ef33bd..d3c34dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -20,7 +20,7 @@
import java.nio.ByteBuffer;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -35,14 +35,14 @@
public class TestJobletContext implements IHyracksJobletContext {
private final int frameSize;
- private final INCApplicationContext appContext;
+ private final INCServiceContext serviceContext;
private final FrameManager frameManger;
private JobId jobId;
private WorkspaceFileFactory fileFactory;
- public TestJobletContext(int frameSize, INCApplicationContext appContext, JobId jobId) throws HyracksException {
+ public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException {
this.frameSize = frameSize;
- this.appContext = appContext;
+ this.serviceContext = serviceContext;
this.jobId = jobId;
fileFactory = new WorkspaceFileFactory(this, (IIOManager) getIOManager());
this.frameManger = new FrameManager(frameSize);
@@ -69,7 +69,7 @@
}
public IIOManager getIOManager() {
- return appContext.getIoManager();
+ return serviceContext.getIoManager();
}
@Override
@@ -98,8 +98,8 @@
}
@Override
- public INCApplicationContext getApplicationContext() {
- return appContext;
+ public INCServiceContext getServiceContext() {
+ return serviceContext;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
similarity index 89%
rename from hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
rename to hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
index 81ee47b..ee74b75 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
@@ -21,7 +21,7 @@
import java.io.Serializable;
import java.util.concurrent.ThreadFactory;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -33,17 +33,17 @@
import org.apache.hyracks.api.resources.memory.IMemoryManager;
import org.apache.hyracks.api.service.IControllerService;
-public class TestNCApplicationContext implements INCApplicationContext {
+public class TestNCServiceContext implements INCServiceContext {
private final ILifeCycleComponentManager lccm;
private final IIOManager ioManager;
private final String nodeId;
private Serializable distributedState;
- private Object appObject;
+ private Object appCtx;
private final IMemoryManager mm;
- public TestNCApplicationContext(IIOManager ioManager, String nodeId) {
+ public TestNCServiceContext(IIOManager ioManager, String nodeId) {
this.lccm = new LifeCycleComponentManager();
this.ioManager = ioManager;
this.nodeId = nodeId;
@@ -86,16 +86,6 @@
}
@Override
- public void setApplicationObject(Object object) {
- this.appObject = object;
- }
-
- @Override
- public Object getApplicationObject() {
- return appObject;
- }
-
- @Override
public void setMessageBroker(IMessageBroker staticticsConnector) {
}
@@ -151,4 +141,9 @@
public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory) {
// do nothing
}
+
+ @Override
+ public Object getApplicationContext() {
+ return appCtx;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index ab87f93..406d0e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -18,12 +18,19 @@
*/
package org.apache.hyracks.test.support;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileReader;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executors;
-import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -38,7 +45,7 @@
public static IHyracksTaskContext create(int frameSize) {
try {
IOManager ioManager = createIoManager();
- INCApplicationContext appCtx = new TestNCApplicationContext(ioManager, null);
+ INCServiceContext appCtx = new TestNCServiceContext(ioManager, null);
TestJobletContext jobletCtx = new TestJobletContext(frameSize, appCtx, new JobId(0));
TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
@@ -53,4 +60,83 @@
devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
return new IOManager(devices, Executors.newCachedThreadPool());
}
+
+ public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+ String lineExpected, lineActual;
+ int num = 1;
+ try (BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+ BufferedReader readerActual = new BufferedReader(new FileReader(actualFile))) {
+ while ((lineExpected = readerExpected.readLine()) != null) {
+ lineActual = readerActual.readLine();
+ if (lineActual == null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+ }
+ if (!equalStrings(lineExpected, lineActual)) {
+ throw new Exception(
+ "Result for changed at line " + num + ":\n< " + lineExpected + "\n> " + lineActual);
+ }
+ ++num;
+ }
+ lineActual = readerActual.readLine();
+ if (lineActual != null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+ }
+ }
+ }
+
+ private static boolean equalStrings(String s1, String s2) {
+ String[] rowsOne = s1.split("\n");
+ String[] rowsTwo = s2.split("\n");
+
+ if (rowsOne.length != rowsTwo.length) {
+ return false;
+ }
+
+ for (int i = 0; i < rowsOne.length; i++) {
+ String row1 = rowsOne[i];
+ String row2 = rowsTwo[i];
+
+ if (row1.equals(row2)) {
+ continue;
+ }
+
+ String[] fields1 = row1.split(",");
+ String[] fields2 = row2.split(",");
+
+ for (int j = 0; j < fields1.length; j++) {
+ if (fields1[j].equals(fields2[j])) {
+ continue;
+ } else if (fields1[j].indexOf('.') < 0) {
+ return false;
+ } else {
+ fields1[j] = fields1[j].split("=")[1];
+ fields2[j] = fields2[j].split("=")[1];
+ Double double1 = Double.parseDouble(fields1[j]);
+ Double double2 = Double.parseDouble(fields2[j]);
+ float float1 = (float) double1.doubleValue();
+ float float2 = (float) double2.doubleValue();
+
+ if (Math.abs(float1 - float2) == 0) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ public static Map<String, NodeControllerInfo> generateNodeControllerInfo(int numberOfNodes, String ncNamePrefix,
+ String addressPrefix, int netPort, int dataPort, int messagingPort) {
+ Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<>();
+ for (int i = 1; i <= numberOfNodes; i++) {
+ String ncId = ncNamePrefix + i;
+ String ncAddress = addressPrefix + i;
+ ncNameToNcInfos.put(ncId,
+ new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort),
+ new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort), 2));
+ }
+ return ncNameToNcInfos;
+ }
}