Made communication between Cluster Controller and Node Controller asynchronous
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1025 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index 06b76ef..922b0fd 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -81,9 +81,9 @@
public static void runJob(JobSpecification spec) throws Exception {
JobId jobId = hcc.createJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
- cc.startJob(jobId);
+ hcc.start(jobId);
AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
- cc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f74d06e..602288c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -21,82 +21,76 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
private final IIPCHandle ipcHandle;
- public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+ private final RPCInterface rpci;
+
+ public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
this.ipcHandle = ipcHandle;
+ this.rpci = rpci;
}
@Override
public ClusterControllerInfo getClusterControllerInfo() throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
- return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+ return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
}
@Override
public void createApplication(String appName) throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
appName);
- sync.call(ipcHandle, caf);
+ rpci.call(ipcHandle, caf);
}
@Override
public void startApplication(String appName) throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
appName);
- sync.call(ipcHandle, saf);
+ rpci.call(ipcHandle, saf);
}
@Override
public void destroyApplication(String appName) throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
appName);
- sync.call(ipcHandle, daf);
+ rpci.call(ipcHandle, daf);
}
@Override
public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
appName, jobSpec, jobFlags);
- return (JobId) sync.call(ipcHandle, cjf);
+ return (JobId) rpci.call(ipcHandle, cjf);
}
@Override
public JobStatus getJobStatus(JobId jobId) throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
jobId);
- return (JobStatus) sync.call(ipcHandle, gjsf);
+ return (JobStatus) rpci.call(ipcHandle, gjsf);
}
@Override
public void startJob(JobId jobId) throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
jobId);
- sync.call(ipcHandle, sjf);
+ rpci.call(ipcHandle, sjf);
}
@Override
public void waitForCompletion(JobId jobId) throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
jobId);
- sync.call(ipcHandle, wfcf);
+ rpci.call(ipcHandle, wfcf);
}
@Override
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
- SyncRMI sync = new SyncRMI();
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
- return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+ return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 1b38cb9..b474ee1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
/**
@@ -63,10 +64,11 @@
*/
public HyracksConnection(String ccHost, int ccPort) throws Exception {
this.ccHost = ccHost;
- ipc = new IPCSystem(new InetSocketAddress(0));
+ RPCInterface rpci = new RPCInterface();
+ ipc = new IPCSystem(new InetSocketAddress(0), rpci);
ipc.start();
IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
- this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
+ this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
ccInfo = hci.getClusterControllerInfo();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index fd401b4..2578b88 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -16,11 +16,9 @@
import java.io.File;
import java.net.InetSocketAddress;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
@@ -31,23 +29,19 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
-import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.ICCContext;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
-import edu.uci.ics.hyracks.control.cc.ipc.HyracksClientInterfaceDelegateIPCI;
-import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationDestroyWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
+import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
-import edu.uci.ics.hyracks.control.cc.work.GetJobStatusConditionVariableWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
import edu.uci.ics.hyracks.control.cc.work.JobCreateWork;
@@ -62,29 +56,21 @@
import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
import edu.uci.ics.hyracks.control.cc.work.TaskFailureWork;
import edu.uci.ics.hyracks.control.cc.work.UnregisterNodeWork;
+import edu.uci.ics.hyracks.control.cc.work.WaitForJobCompletionWork;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
-import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerDelegateIPCI;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
-import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IPCResponder;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
-public class ClusterControllerService extends AbstractRemoteService implements IClusterController,
- IHyracksClientInterface {
+public class ClusterControllerService extends AbstractRemoteService {
private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
private final CCConfig ccConfig;
@@ -132,11 +118,10 @@
applications = new Hashtable<String, CCApplicationContext>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
executor = Executors.newCachedThreadPool();
- IIPCI ccIPCI = new ClusterControllerDelegateIPCI(this);
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, executor);
- IIPCI ciIPCI = new HyracksClientInterfaceDelegateIPCI(this);
- clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
- executor);
+ IIPCI ccIPCI = new ClusterControllerIPCI();
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI);
+ IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
+ clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI);
webServer = new WebServer(this);
activeRunMap = new HashMap<JobId, JobRun>();
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -203,7 +188,7 @@
public Map<JobId, JobRun> getRunMapArchive() {
return runMapArchive;
}
-
+
public Map<String, Set<String>> getIpAddressNodeNameMap() {
return ipAddressNodeNameMap;
}
@@ -232,130 +217,16 @@
return new JobId(jobCounter++);
}
- @Override
- public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- JobId jobId = createJobId();
- JobCreateWork jce = new JobCreateWork(this, jobId, appName, jobSpec, jobFlags);
- workQueue.schedule(jce);
- jce.sync();
- return jobId;
- }
-
- @Override
- public NodeParameters registerNode(NodeRegistration reg) throws Exception {
- String id = reg.getNodeId();
-
- IIPCHandle ncIPCHandle = clusterIPC.getHandle(reg.getNodeControllerAddress());
- INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
-
- NodeControllerState state = new NodeControllerState(nodeController, reg);
- workQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
- LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
- NodeParameters params = new NodeParameters();
- params.setClusterControllerInfo(info);
- params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
- params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
- return params;
- }
-
- @Override
- public void unregisterNode(String nodeId) throws Exception {
- workQueue.schedule(new UnregisterNodeWork(this, nodeId));
- }
-
- @Override
- public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
- throws Exception {
- TaskCompleteWork sce = new TaskCompleteWork(this, jobId, taskId, nodeId, statistics);
- workQueue.schedule(sce);
- }
-
- @Override
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
- TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, details);
- workQueue.schedule(tfe);
- }
-
- @Override
- public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- JobletCleanupNotificationWork jcnw = new JobletCleanupNotificationWork(this, jobId, nodeId);
- workQueue.schedule(jcnw);
- }
-
- @Override
- public JobStatus getJobStatus(JobId jobId) throws Exception {
- GetJobStatusWork gse = new GetJobStatusWork(this, jobId);
- workQueue.scheduleAndSync(gse);
- return gse.getStatus();
- }
-
- @Override
- public void startJob(JobId jobId) throws Exception {
- JobStartWork jse = new JobStartWork(this, jobId);
- workQueue.schedule(jse);
- }
-
- @Override
- public void waitForCompletion(JobId jobId) throws Exception {
- GetJobStatusConditionVariableWork e = new GetJobStatusConditionVariableWork(this, jobId);
- workQueue.scheduleAndSync(e);
- IJobStatusConditionVariable var = e.getConditionVariable();
- if (var != null) {
- var.waitForCompletion();
- }
- }
-
- @Override
- public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- workQueue.schedule(new ReportProfilesWork(this, profiles));
- }
-
- @Override
- public synchronized void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- workQueue.schedule(new NodeHeartbeatWork(this, id, hbData));
- }
-
- @Override
- public void createApplication(String appName) throws Exception {
- FutureValue<Object> fv = new FutureValue<Object>();
- workQueue.schedule(new ApplicationCreateWork(this, appName, fv));
- fv.get();
- }
-
- @Override
- public void destroyApplication(String appName) throws Exception {
- FutureValue<Object> fv = new FutureValue<Object>();
- workQueue.schedule(new ApplicationDestroyWork(this, appName, fv));
- fv.get();
- }
-
- @Override
- public void startApplication(final String appName) throws Exception {
- FutureValue<Object> fv = new FutureValue<Object>();
- workQueue.schedule(new ApplicationStartWork(this, appName, fv));
- fv.get();
- }
-
- @Override
- public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+ public ClusterControllerInfo getClusterControllerInfo() {
return info;
}
- @Override
- public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
- FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
- workQueue.schedule(new GetNodeControllersInfoWork(this, fv));
- return fv.get();
+ public CCConfig getCCConfig() {
+ return ccConfig;
}
- @Override
- public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) {
- workQueue.schedule(new RegisterPartitionAvailibilityWork(this, partitionDescriptor));
- }
-
- @Override
- public void registerPartitionRequest(PartitionRequest partitionRequest) {
- workQueue.schedule(new RegisterPartitionRequestWork(this, partitionRequest));
+ public IPCSystem getClusterIPC() {
+ return clusterIPC;
}
private class DeadNodeSweeper extends TimerTask {
@@ -364,4 +235,156 @@
workQueue.schedule(new RemoveDeadNodesWork(ClusterControllerService.this));
}
}
+
+ private class HyracksClientInterfaceIPCI implements IIPCI {
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case GET_CLUSTER_CONTROLLER_INFO: {
+ try {
+ handle.send(mid, info, null);
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ return;
+ }
+
+ case CREATE_APPLICATION: {
+ HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
+ workQueue.schedule(new ApplicationCreateWork(ClusterControllerService.this, caf.getAppName(),
+ new IPCResponder<Object>(handle, mid)));
+ return;
+ }
+
+ case START_APPLICATION: {
+ HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
+ workQueue.schedule(new ApplicationStartWork(ClusterControllerService.this, saf.getAppName(),
+ new IPCResponder<Object>(handle, mid)));
+ return;
+ }
+
+ case DESTROY_APPLICATION: {
+ HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
+ workQueue.schedule(new ApplicationDestroyWork(ClusterControllerService.this, daf.getAppName(),
+ new IPCResponder<Object>(handle, mid)));
+ return;
+ }
+
+ case CREATE_JOB: {
+ HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
+ JobId jobId = createJobId();
+ workQueue.schedule(new JobCreateWork(ClusterControllerService.this, jobId, cjf.getAppName(), cjf
+ .getJobSpec(), cjf.getJobFlags(), new IPCResponder<JobId>(handle, mid)));
+ return;
+ }
+
+ case GET_JOB_STATUS: {
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+ workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
+ new IPCResponder<JobStatus>(handle, mid)));
+ return;
+ }
+
+ case START_JOB: {
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+ workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getJobId(),
+ new IPCResponder<Object>(handle, mid)));
+ return;
+ }
+
+ case WAIT_FOR_COMPLETION: {
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+ workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
+ new IPCResponder<Object>(handle, mid)));
+ return;
+ }
+
+ case GET_NODE_CONTROLLERS_INFO: {
+ workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
+ new IPCResponder<Map<String, NodeControllerInfo>>(handle, mid)));
+ return;
+ }
+ }
+ try {
+ handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class ClusterControllerIPCI implements IIPCI {
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ ClusterControllerFunctions.Function fn = (Function) payload;
+ switch (fn.getFunctionId()) {
+ case REGISTER_NODE: {
+ ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+ workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
+ return;
+ }
+
+ case UNREGISTER_NODE: {
+ ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+ workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
+ return;
+ }
+
+ case NODE_HEARTBEAT: {
+ ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+ workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
+ .getHeartbeatData()));
+ return;
+ }
+
+ case NOTIFY_JOBLET_CLEANUP: {
+ ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+ workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
+ njcf.getJobId(), njcf.getNodeId()));
+ return;
+ }
+
+ case REPORT_PROFILE: {
+ ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+ workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
+ return;
+ }
+
+ case NOTIFY_TASK_COMPLETE: {
+ ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+ workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
+ .getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+ return;
+ }
+ case NOTIFY_TASK_FAILURE: {
+ ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+ workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
+ .getTaskId(), ntff.getDetails(), ntff.getDetails()));
+ return;
+ }
+
+ case REGISTER_PARTITION_PROVIDER: {
+ ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+ workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
+ .getPartitionDescriptor()));
+ return;
+ }
+
+ case REGISTER_PARTITION_REQUEST: {
+ ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+ workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
+ .getPartitionRequest()));
+ return;
+ }
+
+ case APPLICATION_STATE_CHANGE_RESPONSE: {
+ ClusterControllerFunctions.ApplicationStateChangeResponseFunction astrf = (ClusterControllerFunctions.ApplicationStateChangeResponseFunction) fn;
+ workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
+ return;
+ }
+ }
+ LOGGER.warning("Unknown function: " + fn.getFunctionId());
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index c1c0161..28f7c59 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -17,7 +17,9 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.application.ICCBootstrap;
@@ -30,10 +32,17 @@
import edu.uci.ics.hyracks.control.cc.job.DeserializingJobSpecificationFactory;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
private final ICCContext ccContext;
+ protected final Set<String> initPendingNodeIds;
+ protected final Set<String> deinitPendingNodeIds;
+
+ protected IResultCallback<Object> initializationCallback;
+ protected IResultCallback<Object> deinitializationCallback;
+
private IJobSpecificationFactory jobSpecFactory;
private List<IJobLifecycleListener> jobLifecycleListeners;
@@ -41,6 +50,8 @@
public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
super(serverCtx, appName);
this.ccContext = ccContext;
+ initPendingNodeIds = new HashSet<String>();
+ deinitPendingNodeIds = new HashSet<String>();
jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
}
@@ -98,4 +109,28 @@
l.notifyJobCreation(jobId, specification);
}
}
+
+ public Set<String> getInitializationPendingNodeIds() {
+ return initPendingNodeIds;
+ }
+
+ public Set<String> getDeinitializationPendingNodeIds() {
+ return deinitPendingNodeIds;
+ }
+
+ public IResultCallback<Object> getInitializationCallback() {
+ return initializationCallback;
+ }
+
+ public void setInitializationCallback(IResultCallback<Object> initializationCallback) {
+ this.initializationCallback = initializationCallback;
+ }
+
+ public IResultCallback<Object> getDeinitializationCallback() {
+ return deinitializationCallback;
+ }
+
+ public void setDeinitializationCallback(IResultCallback<Object> deinitializationCallback) {
+ this.deinitializationCallback = deinitializationCallback;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
deleted file mode 100644
index b3aa406..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.ipc;
-
-import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
-import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IIPCI;
-
-public class HyracksClientInterfaceDelegateIPCI implements IIPCI {
- private final IHyracksClientInterface hci;
-
- public HyracksClientInterfaceDelegateIPCI(IHyracksClientInterface hci) {
- this.hci = hci;
- }
-
- @Override
- public Object call(IIPCHandle caller, Object req) throws Exception {
- HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) req;
- switch (fn.getFunctionId()) {
- case GET_CLUSTER_CONTROLLER_INFO: {
- return hci.getClusterControllerInfo();
- }
-
- case CREATE_APPLICATION: {
- HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
- hci.createApplication(caf.getAppName());
- return null;
- }
-
- case START_APPLICATION: {
- HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
- hci.startApplication(saf.getAppName());
- return null;
- }
-
- case DESTROY_APPLICATION: {
- HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
- hci.destroyApplication(daf.getAppName());
- return null;
- }
-
- case CREATE_JOB: {
- HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
- return hci.createJob(cjf.getAppName(), cjf.getJobSpec(), cjf.getJobFlags());
- }
-
- case GET_JOB_STATUS: {
- HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
- return hci.getJobStatus(gjsf.getJobId());
- }
-
- case START_JOB: {
- HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
- hci.startJob(sjf.getJobId());
- return null;
- }
-
- case WAIT_FOR_COMPLETION: {
- HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
- hci.waitForCompletion(wfcf.getJobId());
- return null;
- }
-
- case GET_NODE_CONTROLLERS_INFO: {
- return hci.getNodeControllersInfo();
- }
- }
- throw new IllegalArgumentException("Unknown function " + fn.getFunctionId());
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteOp.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteOp.java
deleted file mode 100644
index ec15186..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteOp.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote;
-
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public interface RemoteOp<T> {
- public String getNodeId();
-
- public T execute(INodeController node) throws Exception;
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
deleted file mode 100644
index a37c786..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote;
-
-import java.util.List;
-import java.util.Vector;
-import java.util.concurrent.Semaphore;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class RemoteRunner {
- public static <T, R> R runRemote(ClusterControllerService ccs, final RemoteOp<T>[] remoteOps,
- final Accumulator<T, R> accumulator) throws Exception {
- final Semaphore installComplete = new Semaphore(remoteOps.length);
- final List<Exception> errors = new Vector<Exception>();
- for (final RemoteOp<T> remoteOp : remoteOps) {
- NodeControllerState nodeState = ccs.getNodeMap().get(remoteOp.getNodeId());
- final INodeController node = nodeState.getNodeController();
-
- installComplete.acquire();
- Runnable remoteRunner = new Runnable() {
- @Override
- public void run() {
- try {
- T t = remoteOp.execute(node);
- if (accumulator != null) {
- synchronized (accumulator) {
- accumulator.accumulate(t);
- }
- }
- } catch (Exception e) {
- errors.add(e);
- } finally {
- installComplete.release();
- }
- }
- };
-
- ccs.getExecutor().execute(remoteRunner);
- }
- installComplete.acquire(remoteOps.length);
- if (!errors.isEmpty()) {
- throw errors.get(0);
- }
- return accumulator == null ? null : accumulator.getResult();
- }
-}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationDestroyer.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationDestroyer.java
deleted file mode 100644
index 52e726c..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationDestroyer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class ApplicationDestroyer implements RemoteOp<Void> {
- private String nodeId;
- private String appName;
-
- public ApplicationDestroyer(String nodeId, String appName) {
- this.nodeId = nodeId;
- this.appName = appName;
- }
-
- @Override
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public Void execute(INodeController node) throws Exception {
- node.destroyApplication(appName);
- return null;
- }
-
- @Override
- public String toString() {
- return "Destroyed application: " + appName;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationStarter.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationStarter.java
deleted file mode 100644
index d6a1d26..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationStarter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class ApplicationStarter implements RemoteOp<Void> {
- private String nodeId;
- private String appName;
- private boolean deployHar;
- private byte[] distributedState;
-
- public ApplicationStarter(String nodeId, String appName, boolean deployHar, byte[] distributedState) {
- this.nodeId = nodeId;
- this.appName = appName;
- this.deployHar = deployHar;
- this.distributedState = distributedState;
- }
-
- @Override
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public Void execute(INodeController node) throws Exception {
- node.createApplication(appName, deployHar, distributedState);
- return null;
- }
-
- @Override
- public String toString() {
- return "Started application: " + appName;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
deleted file mode 100644
index d0f9151..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class JobCompleteNotifier implements RemoteOp<Void> {
- private String nodeId;
- private JobId jobId;
- private JobStatus status;
-
- public JobCompleteNotifier(String nodeId, JobId jobId, JobStatus status) {
- this.nodeId = nodeId;
- this.jobId = jobId;
- this.status = status;
- }
-
- @Override
- public Void execute(INodeController node) throws Exception {
- node.cleanUpJoblet(jobId, status);
- return null;
- }
-
- @Override
- public String toString() {
- return jobId + " Cleaning Up";
- }
-
- @Override
- public String getNodeId() {
- return nodeId;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
index 3c8f7d0..15d6d1f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
@@ -20,34 +20,42 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
public class ApplicationCreateWork extends AbstractWork {
private final ClusterControllerService ccs;
private final String appName;
- private FutureValue<Object> fv;
+ private IResultCallback<Object> callback;
- public ApplicationCreateWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+ public ApplicationCreateWork(ClusterControllerService ccs, String appName, IResultCallback<Object> callback) {
this.ccs = ccs;
this.appName = appName;
- this.fv = fv;
+ this.callback = callback;
}
@Override
public void run() {
- Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
- if (applications.containsKey(appName)) {
- fv.setException(new HyracksException("Duplicate application with name: " + appName + " being created."));
- }
- CCApplicationContext appCtx;
try {
- appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
- } catch (IOException e) {
- fv.setException(e);
- return;
+ Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
+ if (applications.containsKey(appName)) {
+ callback.setException(new HyracksException("Duplicate application with name: " + appName
+ + " being created."));
+ return;
+ }
+ CCApplicationContext appCtx;
+ try {
+ appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
+ } catch (IOException e) {
+ callback.setException(e);
+ return;
+ }
+ appCtx.setStatus(ApplicationStatus.CREATED);
+ applications.put(appName, appCtx);
+ callback.setValue(null);
+ } catch (Exception e) {
+ callback.setException(e);
}
- applications.put(appName, appCtx);
- fv.setValue(null);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
index c6af1b9..ac780c4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
@@ -14,62 +14,51 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
-import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationDestroyer;
-import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
public class ApplicationDestroyWork extends AbstractWork {
private final ClusterControllerService ccs;
private final String appName;
- private FutureValue<Object> fv;
+ private IResultCallback<Object> callback;
- public ApplicationDestroyWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+ public ApplicationDestroyWork(ClusterControllerService ccs, String appName, IResultCallback<Object> callback) {
this.ccs = ccs;
this.appName = appName;
- this.fv = fv;
+ this.callback = callback;
}
@Override
public void run() {
- final ApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
- if (appCtx == null) {
- fv.setException(new HyracksException("No application with name: " + appName));
- return;
- }
- List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
- for (final String nodeId : ccs.getNodeMap().keySet()) {
- opList.add(new ApplicationDestroyer(nodeId, appName));
- }
- final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- RemoteRunner.runRemote(ccs, ops, null);
- } catch (Exception e) {
- fv.setException(e);
- return;
- }
- ccs.getWorkQueue().schedule(new AbstractWork() {
- @Override
- public void run() {
- try {
- appCtx.deinitialize();
- } catch (Exception e) {
- fv.setException(e);
- }
- fv.setValue(null);
- }
- });
+ try {
+ final CCApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
+ if (appCtx == null) {
+ callback.setException(new HyracksException("No application with name: " + appName));
+ return;
}
- });
+ if (appCtx.getStatus() == ApplicationStatus.IN_DEINITIALIZATION
+ || appCtx.getStatus() == ApplicationStatus.DEINITIALIZED) {
+ return;
+ }
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ appCtx.getDeinitializationPendingNodeIds().addAll(nodeMap.keySet());
+ appCtx.setStatus(ApplicationStatus.IN_DEINITIALIZATION);
+ appCtx.setDeinitializationCallback(callback);
+ for (String nodeId : ccs.getNodeMap().keySet()) {
+ NodeControllerState nodeState = nodeMap.get(nodeId);
+ final INodeController node = nodeState.getNodeController();
+ node.destroyApplication(appName);
+ }
+ } catch (Exception e) {
+ callback.setException(e);
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
index 637c55d..e4ad56c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
@@ -14,56 +14,67 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
-import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationStarter;
-import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
public class ApplicationStartWork extends AbstractWork {
private final ClusterControllerService ccs;
private final String appName;
- private final FutureValue<Object> fv;
+ private final IResultCallback<Object> callback;
- public ApplicationStartWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+ public ApplicationStartWork(ClusterControllerService ccs, String appName, IResultCallback<Object> callback) {
this.ccs = ccs;
this.appName = appName;
- this.fv = fv;
+ this.callback = callback;
}
@Override
public void run() {
- final ApplicationContext appCtx = ccs.getApplicationMap().get(appName);
- if (appCtx == null) {
- fv.setException(new HyracksException("No application with name: " + appName));
- return;
- }
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- appCtx.initializeClassPath();
- appCtx.initialize();
- final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDistributedState());
- final boolean deployHar = appCtx.containsHar();
- List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
- for (final String nodeId : ccs.getNodeMap().keySet()) {
- opList.add(new ApplicationStarter(nodeId, appName, deployHar, distributedState));
- }
- final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
- RemoteRunner.runRemote(ccs, ops, null);
- fv.setValue(null);
- } catch (Exception e) {
- fv.setException(e);
- }
+ try {
+ final CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
+ if (appCtx == null) {
+ callback.setException(new HyracksException("No application with name: " + appName));
+ return;
}
- });
+ if (appCtx.getStatus() != ApplicationStatus.CREATED) {
+ callback.setException(new HyracksException("Application in incorrect state for starting: "
+ + appCtx.getStatus()));
+ }
+ final Map<String, NodeControllerState> nodeMapCopy = new HashMap<String, NodeControllerState>(
+ ccs.getNodeMap());
+ appCtx.getInitializationPendingNodeIds().addAll(nodeMapCopy.keySet());
+ appCtx.setStatus(ApplicationStatus.IN_INITIALIZATION);
+ appCtx.setInitializationCallback(callback);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ appCtx.initializeClassPath();
+ appCtx.initialize();
+ final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDistributedState());
+ final boolean deployHar = appCtx.containsHar();
+ for (final String nodeId : nodeMapCopy.keySet()) {
+ NodeControllerState nodeState = nodeMapCopy.get(nodeId);
+ final INodeController node = nodeState.getNodeController();
+ node.createApplication(appName, deployHar, distributedState);
+ }
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ callback.setException(e);
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
new file mode 100644
index 0000000..2d8ae85
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
@@ -0,0 +1,85 @@
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+
+public class ApplicationStateChangeWork extends AbstractWork {
+ private static final Logger LOGGER = Logger.getLogger(ApplicationStateChangeWork.class.getName());
+
+ private final ClusterControllerService ccs;
+ private final ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf;
+
+ public ApplicationStateChangeWork(ClusterControllerService ccs,
+ ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf) {
+ this.ccs = ccs;
+ this.ascrf = ascrf;
+ }
+
+ @Override
+ public void run() {
+ final CCApplicationContext appCtx = ccs.getApplicationMap().get(ascrf.getApplicationName());
+ if (appCtx == null) {
+ LOGGER.warning("Got ApplicationStateChangeResponse for application " + ascrf.getApplicationName()
+ + " that does not exist");
+ return;
+ }
+ switch (ascrf.getStatus()) {
+ case INITIALIZED: {
+ Set<String> pendingNodeIds = appCtx.getInitializationPendingNodeIds();
+ boolean changed = pendingNodeIds.remove(ascrf.getNodeId());
+ if (!changed) {
+ LOGGER.warning("Got ApplicationStateChangeResponse for application " + ascrf.getApplicationName()
+ + " from unexpected node " + ascrf.getNodeId() + " to state " + ascrf.getStatus());
+ return;
+ }
+ if (pendingNodeIds.isEmpty()) {
+ appCtx.setStatus(ApplicationStatus.INITIALIZED);
+ IResultCallback<Object> callback = appCtx.getInitializationCallback();
+ appCtx.setInitializationCallback(null);
+ callback.setValue(null);
+ }
+ return;
+ }
+
+ case DEINITIALIZED: {
+ Set<String> pendingNodeIds = appCtx.getDeinitializationPendingNodeIds();
+ boolean changed = pendingNodeIds.remove(ascrf.getNodeId());
+ if (!changed) {
+ LOGGER.warning("Got ApplicationStateChangeResponse for application " + ascrf.getApplicationName()
+ + " from unexpected node " + ascrf.getNodeId() + " to state " + ascrf.getStatus());
+ return;
+ }
+ if (pendingNodeIds.isEmpty()) {
+ appCtx.setStatus(ApplicationStatus.DEINITIALIZED);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ appCtx.deinitialize();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ ccs.getWorkQueue().schedule(new AbstractWork() {
+ @Override
+ public void run() {
+ ccs.getApplicationMap().remove(ascrf.getApplicationName());
+ IResultCallback<Object> callback = appCtx.getDeinitializationCallback();
+ appCtx.setDeinitializationCallback(null);
+ callback.setValue(null);
+ }
+ });
+ }
+ });
+ }
+ return;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java
deleted file mode 100644
index ff67928..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetJobStatusConditionVariableWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private final JobId jobId;
- private IJobStatusConditionVariable cVar;
-
- public GetJobStatusConditionVariableWork(ClusterControllerService ccs, JobId jobId) {
- this.ccs = ccs;
- this.jobId = jobId;
- }
-
- @Override
- protected void doRun() throws Exception {
- cVar = ccs.getActiveRunMap().get(jobId);
- if (cVar == null) {
- cVar = ccs.getRunMapArchive().get(jobId);
- }
- }
-
- public IJobStatusConditionVariable getConditionVariable() {
- return cVar;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
index 7a8943a..c70e24f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
@@ -18,28 +18,31 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
public class GetJobStatusWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final JobId jobId;
- private JobStatus status;
+ private final IResultCallback<JobStatus> callback;
- public GetJobStatusWork(ClusterControllerService ccs, JobId jobId) {
+ public GetJobStatusWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobStatus> callback) {
this.ccs = ccs;
this.jobId = jobId;
+ this.callback = callback;
}
@Override
protected void doRun() throws Exception {
- JobRun run = ccs.getActiveRunMap().get(jobId);
- if (run == null) {
- run = ccs.getRunMapArchive().get(jobId);
+ try {
+ JobRun run = ccs.getActiveRunMap().get(jobId);
+ if (run == null) {
+ run = ccs.getRunMapArchive().get(jobId);
+ }
+ JobStatus status = run == null ? null : run.getStatus();
+ callback.setValue(status);
+ } catch (Exception e) {
+ callback.setException(e);
}
- status = run == null ? null : run.getStatus();
- }
-
- public JobStatus getStatus() {
- return status;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 48d9b84..49a3be9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -22,15 +22,16 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
public class GetNodeControllersInfoWork extends AbstractWork {
private final ClusterControllerService ccs;
- private FutureValue<Map<String, NodeControllerInfo>> fv;
+ private IResultCallback<Map<String, NodeControllerInfo>> callback;
- public GetNodeControllersInfoWork(ClusterControllerService ccs, FutureValue<Map<String, NodeControllerInfo>> fv) {
+ public GetNodeControllersInfoWork(ClusterControllerService ccs,
+ IResultCallback<Map<String, NodeControllerInfo>> callback) {
this.ccs = ccs;
- this.fv = fv;
+ this.callback = callback;
}
@Override
@@ -41,6 +42,6 @@
result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
NodeStatus.ALIVE));
}
- fv.setValue(result);
+ callback.setValue(result);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index e30a718..07088bb 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -20,9 +20,8 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
-import edu.uci.ics.hyracks.control.cc.remote.ops.JobCompleteNotifier;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
public class JobCleanupWork extends AbstractWork {
@@ -54,22 +53,13 @@
Set<String> targetNodes = run.getParticipatingNodeIds();
run.getCleanupPendingNodeIds().addAll(targetNodes);
run.setPendingStatus(status, exception);
- final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
- int i = 0;
for (String n : targetNodes) {
- jcns[i++] = new JobCompleteNotifier(n, jobId, status);
- }
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- if (jcns.length > 0) {
- try {
- RemoteRunner.runRemote(ccs, jcns, null);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ NodeControllerState ncs = ccs.getNodeMap().get(n);
+ try {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ } catch (Exception e) {
+ e.printStackTrace();
}
- });
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
index 14947cd..b7cf629 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
public class JobCreateWork extends SynchronizableWork {
@@ -38,42 +39,50 @@
private final EnumSet<JobFlag> jobFlags;
private final JobId jobId;
private final String appName;
+ private final IResultCallback<JobId> callback;
public JobCreateWork(ClusterControllerService ccs, JobId jobId, String appName, byte[] jobSpec,
- EnumSet<JobFlag> jobFlags) {
+ EnumSet<JobFlag> jobFlags, IResultCallback<JobId> callback) {
this.jobId = jobId;
this.ccs = ccs;
this.jobSpec = jobSpec;
this.jobFlags = jobFlags;
this.appName = appName;
+ this.callback = callback;
}
@Override
protected void doRun() throws Exception {
- CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
- if (appCtx == null) {
- throw new HyracksException("No application with id " + appName + " found");
- }
- JobSpecification spec = appCtx.createJobSpecification(jobSpec);
-
- final JobActivityGraphBuilder builder = new JobActivityGraphBuilder();
- builder.init(appName, spec, jobFlags);
- PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) {
- op.contributeActivities(builder);
+ try {
+ CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
+ if (appCtx == null) {
+ throw new HyracksException("No application with id " + appName + " found");
}
- });
- final JobActivityGraph jag = builder.getActivityGraph();
+ JobSpecification spec = appCtx.createJobSpecification(jobSpec);
- JobRun run = new JobRun(jobId, jag);
+ final JobActivityGraphBuilder builder = new JobActivityGraphBuilder();
+ builder.init(appName, spec, jobFlags);
+ PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) {
+ op.contributeActivities(builder);
+ }
+ });
+ final JobActivityGraph jag = builder.getActivityGraph();
- run.setStatus(JobStatus.INITIALIZED, null);
+ JobRun run = new JobRun(jobId, jag);
- ccs.getActiveRunMap().put(jobId, run);
- JobScheduler jrs = new JobScheduler(ccs, run);
- run.setScheduler(jrs);
- appCtx.notifyJobCreation(jobId, spec);
+ run.setStatus(JobStatus.INITIALIZED, null);
+
+ ccs.getActiveRunMap().put(jobId, run);
+ JobScheduler jrs = new JobScheduler(ccs, run);
+ run.setScheduler(jrs);
+ appCtx.notifyJobCreation(jobId, spec);
+ callback.setValue(jobId);
+ } catch (Exception e) {
+ callback.setException(e);
+ return;
+ }
}
public JobId getJobId() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index 855e3e7..dd834ed 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -18,31 +18,39 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
public class JobStartWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final JobId jobId;
+ private final IResultCallback<Object> callback;
- public JobStartWork(ClusterControllerService ccs, JobId jobId) {
+ public JobStartWork(ClusterControllerService ccs, JobId jobId, IResultCallback<Object> callback) {
this.ccs = ccs;
this.jobId = jobId;
+ this.callback = callback;
}
@Override
protected void doRun() throws Exception {
- JobRun run = ccs.getActiveRunMap().get(jobId);
- if (run == null) {
- throw new Exception("Unable to find job with id = " + jobId);
- }
- if (run.getStatus() != JobStatus.INITIALIZED) {
- throw new Exception("Job already started");
- }
- run.setStatus(JobStatus.RUNNING, null);
try {
- run.getScheduler().startJob();
+ JobRun run = ccs.getActiveRunMap().get(jobId);
+ if (run == null) {
+ throw new Exception("Unable to find job with id = " + jobId);
+ }
+ if (run.getStatus() != JobStatus.INITIALIZED) {
+ throw new Exception("Job already started");
+ }
+ run.setStatus(JobStatus.RUNNING, null);
+ try {
+ run.getScheduler().startJob();
+ } catch (Exception e) {
+ ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, e));
+ }
+ callback.setValue(null);
} catch (Exception e) {
- ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, e));
+ callback.setException(e);
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index e65b1de..cba4b4b5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -17,36 +17,62 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
public class RegisterNodeWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private final String nodeId;
- private final NodeControllerState state;
+ private static final Logger LOGGER = Logger.getLogger(RegisterNodeWork.class.getName());
- public RegisterNodeWork(ClusterControllerService ccs, String nodeId, NodeControllerState state) {
+ private final ClusterControllerService ccs;
+ private final NodeRegistration reg;
+
+ public RegisterNodeWork(ClusterControllerService ccs, NodeRegistration reg) {
this.ccs = ccs;
- this.nodeId = nodeId;
- this.state = state;
+ this.reg = reg;
}
@Override
protected void doRun() throws Exception {
- Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
- if (nodeMap.containsKey(nodeId)) {
- throw new Exception("Node with this name already registered.");
+ String id = reg.getNodeId();
+
+ IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
+ NodeControllerFunctions.NodeRegistrationResult result = null;
+ try {
+ INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
+
+ NodeControllerState state = new NodeControllerState(nodeController, reg);
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ if (nodeMap.containsKey(id)) {
+ throw new Exception("Node with this name already registered.");
+ }
+ nodeMap.put(id, state);
+ Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
+ String ipAddress = state.getNCConfig().dataIPAddress;
+ Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+ if (nodes == null) {
+ nodes = new HashSet<String>();
+ ipAddressNodeNameMap.put(ipAddress, nodes);
+ }
+ nodes.add(id);
+ LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+ NodeParameters params = new NodeParameters();
+ params.setClusterControllerInfo(ccs.getClusterControllerInfo());
+ params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
+ params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
+ result = new NodeControllerFunctions.NodeRegistrationResult(params, null);
+ } catch (Exception e) {
+ result = new NodeControllerFunctions.NodeRegistrationResult(null, e);
}
- nodeMap.put(nodeId, state);
- Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
- String ipAddress = state.getNCConfig().dataIPAddress;
- Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
- if (nodes == null) {
- nodes = new HashSet<String>();
- ipAddressNodeNameMap.put(ipAddress, nodes);
- }
- nodes.add(nodeId);
+ ncIPCHandle.send(-1, result, null);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
new file mode 100644
index 0000000..6cfe025
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class WaitForJobCompletionWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final JobId jobId;
+ private final IResultCallback<Object> callback;
+
+ public WaitForJobCompletionWork(ClusterControllerService ccs, JobId jobId, IResultCallback<Object> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ final IJobStatusConditionVariable cRunningVar = ccs.getActiveRunMap().get(jobId);
+ if (cRunningVar != null) {
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ cRunningVar.waitForCompletion();
+ callback.setValue(null);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+ });
+ } else {
+ final IJobStatusConditionVariable cArchivedVar = ccs.getRunMapArchive().get(jobId);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ cArchivedVar.waitForCompletion();
+ callback.setValue(null);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+ });
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
index 42d0ecb..21e3e9d 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
@@ -19,4 +19,4 @@
public abstract class AbstractRemoteService implements IService {
public AbstractRemoteService() {
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index d2a06be..9f29fbd 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -59,7 +59,6 @@
this.serverCtx = serverCtx;
this.appName = appName;
this.applicationRootDir = new File(new File(serverCtx.getBaseDir(), APPLICATION_ROOT), appName);
- status = ApplicationStatus.CREATED;
FileUtils.deleteDirectory(applicationRootDir);
applicationRootDir.mkdirs();
}
@@ -87,9 +86,6 @@
}
public void initialize() throws Exception {
- if (status != ApplicationStatus.CREATED) {
- throw new IllegalStateException();
- }
if (deploymentDescriptor != null) {
String bootstrapClass = null;
switch (serverCtx.getServerType()) {
@@ -107,7 +103,6 @@
start();
}
}
- status = ApplicationStatus.INITIALIZED;
}
protected abstract void start() throws Exception;
@@ -168,7 +163,6 @@
}
public void deinitialize() throws Exception {
- status = ApplicationStatus.DEINITIALIZED;
stop();
File expandedFolder = getExpandedFolder();
FileUtils.deleteDirectory(expandedFolder);
@@ -203,4 +197,12 @@
public ClassLoader getClassLoader() {
return classLoader;
}
-}
+
+ public void setStatus(ApplicationStatus status) {
+ this.status = status;
+ }
+
+ public ApplicationStatus getStatus() {
+ return status;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
index 8a97ad6..b45c4f1 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
@@ -16,6 +16,8 @@
public enum ApplicationStatus {
CREATED,
+ IN_INITIALIZATION,
INITIALIZED,
+ IN_DEINITIALIZATION,
DEINITIALIZED
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 40e9347..a03f5c9 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,7 +18,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -27,7 +27,7 @@
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
public interface IClusterController {
- public NodeParameters registerNode(NodeRegistration reg) throws Exception;
+ public void registerNode(NodeRegistration reg) throws Exception;
public void unregisterNode(String nodeId) throws Exception;
@@ -45,4 +45,6 @@
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+
+ public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
deleted file mode 100644
index e35f962..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IIPCI;
-
-public class ClusterControllerDelegateIPCI implements IIPCI {
- private final IClusterController cc;
-
- public ClusterControllerDelegateIPCI(IClusterController cc) {
- this.cc = cc;
- }
-
- @Override
- public Object call(IIPCHandle caller, Object req) throws Exception {
- ClusterControllerFunctions.Function fn = (Function) req;
- switch (fn.getFunctionId()) {
- case REGISTER_NODE: {
- ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
- return cc.registerNode(rnf.getNodeRegistration());
- }
-
- case UNREGISTER_NODE: {
- ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
- cc.unregisterNode(unf.getNodeId());
- return null;
- }
-
- case NODE_HEARTBEAT: {
- ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
- cc.nodeHeartbeat(nhf.getNodeId(), nhf.getHeartbeatData());
- return null;
- }
-
- case NOTIFY_JOBLET_CLEANUP: {
- ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
- cc.notifyJobletCleanup(njcf.getJobId(), njcf.getNodeId());
- return null;
- }
-
- case REPORT_PROFILE: {
- ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
- cc.reportProfile(rpf.getNodeId(), rpf.getProfiles());
- return null;
- }
-
- case NOTIFY_TASK_COMPLETE: {
- ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
- cc.notifyTaskComplete(ntcf.getJobId(), ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics());
- return null;
- }
- case NOTIFY_TASK_FAILURE: {
- ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
- cc.notifyTaskFailure(ntff.getJobId(), ntff.getTaskId(), ntff.getDetails(), ntff.getDetails());
- return null;
- }
-
- case REGISTER_PARTITION_PROVIDER: {
- ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
- cc.registerPartitionProvider(rppf.getPartitionDescriptor());
- return null;
- }
-
- case REGISTER_PARTITION_REQUEST: {
- ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
- cc.registerPartitionRequest(rprf.getPartitionRequest());
- return null;
- }
- }
- throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
index 4c76357..2b6792e 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -37,6 +38,7 @@
REPORT_PROFILE,
REGISTER_PARTITION_PROVIDER,
REGISTER_PARTITION_REQUEST,
+ APPLICATION_STATE_CHANGE_RESPONSE,
}
public static abstract class Function implements Serializable {
@@ -269,4 +271,35 @@
return partitionRequest;
}
}
+
+ public static class ApplicationStateChangeResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final String appName;
+ private final ApplicationStatus status;
+
+ public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+ this.nodeId = nodeId;
+ this.appName = appName;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getApplicationName() {
+ return appName;
+ }
+
+ public ApplicationStatus getStatus() {
+ return status;
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0aeab72..b7dd0af 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,8 +18,8 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
public class ClusterControllerRemoteProxy implements IClusterController {
private final IIPCHandle ipcHandle;
@@ -37,75 +36,72 @@
}
@Override
- public NodeParameters registerNode(NodeRegistration reg) throws Exception {
- SyncRMI sync = new SyncRMI();
+ public void registerNode(NodeRegistration reg) throws Exception {
ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
- NodeParameters result = (NodeParameters) sync.call(ipcHandle, fn);
- return result;
+ ipcHandle.send(-1, fn, null);
}
@Override
public void unregisterNode(String nodeId) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
nodeId);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
jobId, taskId, nodeId, statistics);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
jobId, taskId, nodeId, details);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
jobId, nodeId);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
hbData);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
profiles);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
partitionDescriptor);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
partitionRequest);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
+ ClusterControllerFunctions.ApplicationStateChangeResponseFunction fn = new ClusterControllerFunctions.ApplicationStateChangeResponseFunction(
+ nodeId, appName, status);
+ ipcHandle.send(-1, fn, null);
}
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
deleted file mode 100644
index f3e51ec..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IIPCI;
-
-public class NodeControllerDelegateIPCI implements IIPCI {
- private final INodeController nc;
-
- public NodeControllerDelegateIPCI(INodeController nc) {
- this.nc = nc;
- }
-
- @Override
- public Object call(IIPCHandle caller, Object req) throws Exception {
- NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) req;
- switch (fn.getFunctionId()) {
- case START_TASKS: {
- NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
- nc.startTasks(stf.getAppName(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(),
- stf.getConnectorPolicies());
- return null;
- }
-
- case ABORT_TASKS: {
- NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
- nc.abortTasks(atf.getJobId(), atf.getTasks());
- return null;
- }
-
- case CLEANUP_JOBLET: {
- NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
- nc.cleanUpJoblet(cjf.getJobId(), cjf.getStatus());
- return null;
- }
-
- case CREATE_APPLICATION: {
- NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
- nc.createApplication(caf.getAppName(), caf.isDeployHar(), caf.getSerializedDistributedState());
- return null;
- }
-
- case DESTROY_APPLICATION: {
- NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
- nc.destroyApplication(daf.getAppName());
- return null;
- }
-
- case REPORT_PARTITION_AVAILABILITY: {
- NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
- nc.reportPartitionAvailability(rpaf.getPartitionId(), rpaf.getNetworkAddress());
- return null;
- }
- }
- throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
index 0d39c1b..b72d4e5 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
@@ -25,10 +25,12 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
public class NodeControllerFunctions {
public enum FunctionId {
+ NODE_REGISTRATION_RESULT,
START_TASKS,
ABORT_TASKS,
CLEANUP_JOBLET,
@@ -43,6 +45,32 @@
public abstract FunctionId getFunctionId();
}
+ public static class NodeRegistrationResult extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeParameters params;
+
+ private final Exception exception;
+
+ public NodeRegistrationResult(NodeParameters params, Exception exception) {
+ this.params = params;
+ this.exception = exception;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_REGISTRATION_RESULT;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return params;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+ }
+
public static class StartTasksFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index ccd9468..474fb93 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
public class NodeControllerRemoteProxy implements INodeController {
private final IIPCHandle ipcHandle;
@@ -39,48 +38,43 @@
@Override
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
- SyncRMI sync = new SyncRMI();
NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
planBytes, taskDescriptors, connectorPolicies);
- sync.call(ipcHandle, stf);
+ ipcHandle.send(-1, stf, null);
}
@Override
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
- SyncRMI sync = new SyncRMI();
NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
- sync.call(ipcHandle, atf);
+ ipcHandle.send(-1, atf, null);
}
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
- SyncRMI sync = new SyncRMI();
NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
status);
- sync.call(ipcHandle, cjf);
+ ipcHandle.send(-1, cjf, null);
}
@Override
public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
throws Exception {
- SyncRMI sync = new SyncRMI();
NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
appName, deployHar, serializedDistributedState);
- sync.call(ipcHandle, caf);
+ ipcHandle.send(-1, caf, null);
}
@Override
public void destroyApplication(String appName) throws Exception {
- SyncRMI sync = new SyncRMI();
NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
appName);
- sync.call(ipcHandle, daf);
+ ipcHandle.send(-1, daf, null);
}
@Override
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
pid, networkAddress);
- ipcHandle.send(rpaf, null);
+ ipcHandle.send(-1, rpaf, null);
}
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
index 00565b6..7eb4ff6 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.control.common.work;
-public class FutureValue<T> {
+public class FutureValue<T> implements IResultCallback<T> {
private boolean done;
private T value;
@@ -27,6 +27,7 @@
e = null;
}
+ @Override
public synchronized void setValue(T value) {
done = true;
this.value = value;
@@ -34,6 +35,7 @@
notifyAll();
}
+ @Override
public synchronized void setException(Exception e) {
done = true;
this.e = e;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
new file mode 100644
index 0000000..dcea864
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.common.work;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCResponder<T> implements IResultCallback<T> {
+ private final IIPCHandle handle;
+
+ private final long rmid;
+
+ public IPCResponder(IIPCHandle handle, long rmid) {
+ this.handle = handle;
+ this.rmid = rmid;
+ }
+
+ @Override
+ public void setValue(T result) {
+ try {
+ handle.send(rmid, result, null);
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void setException(Exception e) {
+ try {
+ handle.send(rmid, null, e);
+ } catch (IPCException e1) {
+ e1.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/Accumulator.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
similarity index 78%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/Accumulator.java
rename to hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
index c728b0b..80c3d76 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/Accumulator.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
@@ -12,10 +12,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.remote;
+package edu.uci.ics.hyracks.control.common.work;
-public interface Accumulator<T, R> {
- public void accumulate(T o);
+public interface IResultCallback<T> {
+ public void setValue(T result);
- public R getResult();
+ public void setException(Exception e);
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index e211859..ba1d970 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -39,18 +39,11 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
@@ -58,8 +51,7 @@
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerDelegateIPCI;
-import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -76,10 +68,11 @@
import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
-public class NodeControllerService extends AbstractRemoteService implements INodeController {
+public class NodeControllerService extends AbstractRemoteService {
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
private NCConfig ncConfig;
@@ -98,6 +91,10 @@
private final Timer timer;
+ private boolean registrationPending;
+
+ private Exception registrationException;
+
private IClusterController ccs;
private final Map<JobId, Joblet> jobletMap;
@@ -126,8 +123,8 @@
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
executor = Executors.newCachedThreadPool();
- NodeControllerDelegateIPCI ipci = new NodeControllerDelegateIPCI(this);
- ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci, executor);
+ NodeControllerIPCI ipci = new NodeControllerIPCI();
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci);
this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
if (id == null) {
throw new Exception("id not set");
@@ -146,6 +143,7 @@
threadMXBean = ManagementFactory.getThreadMXBean();
runtimeMXBean = ManagementFactory.getRuntimeMXBean();
osMXBean = ManagementFactory.getOperatingSystemMXBean();
+ registrationPending = true;
}
public IHyracksRootContext getRootContext() {
@@ -162,6 +160,13 @@
return devices;
}
+ private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ this.nodeParameters = parameters;
+ this.registrationException = exception;
+ this.registrationPending = false;
+ notifyAll();
+ }
+
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -174,9 +179,19 @@
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager
- .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
- .getAvailableProcessors(), hbSchema));
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ hbSchema));
+
+ synchronized (this) {
+ while (registrationPending) {
+ wait();
+ }
+ }
+ if (registrationException != null) {
+ throw registrationException;
+ }
+
queue.start();
heartbeatTask = new HeartbeatTask(ccs);
@@ -238,53 +253,10 @@
return executor;
}
- @Override
- public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
- StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
- queue.schedule(stw);
- }
-
- @Override
- public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
- CleanupJobletWork cjw = new CleanupJobletWork(this, jobId, status);
- queue.schedule(cjw);
- }
-
public NCConfig getConfiguration() throws Exception {
return ncConfig;
}
- @Override
- public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
- AbortTasksWork atw = new AbortTasksWork(this, jobId, tasks);
- queue.schedule(atw);
- }
-
- @Override
- public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
- throws Exception {
- FutureValue<Object> fv = new FutureValue<Object>();
- CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState, fv);
- queue.schedule(caw);
- fv.get();
- }
-
- @Override
- public void destroyApplication(String appName) throws Exception {
- FutureValue<Object> fv = new FutureValue<Object>();
- DestroyApplicationWork daw = new DestroyApplicationWork(this, appName, fv);
- queue.schedule(daw);
- fv.get();
- }
-
- @Override
- public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
- ReportPartitionAvailabilityWork rpaw = new ReportPartitionAvailabilityWork(this, pid, networkAddress);
- queue.scheduleAndSync(rpaw);
- }
-
private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
String ipaddrStr = ncConfig.dataIPAddress;
ipaddrStr = ipaddrStr.trim();
@@ -371,4 +343,59 @@
}
}
}
+
+ private final class NodeControllerIPCI implements IIPCI {
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case START_TASKS: {
+ NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+ queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
+ .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
+ return;
+ }
+
+ case ABORT_TASKS: {
+ NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+ queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+ return;
+ }
+
+ case CLEANUP_JOBLET: {
+ NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+ queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+ return;
+ }
+
+ case CREATE_APPLICATION: {
+ NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+ queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
+ .isDeployHar(), caf.getSerializedDistributedState()));
+ return;
+ }
+
+ case DESTROY_APPLICATION: {
+ NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+ queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
+ return;
+ }
+
+ case REPORT_PARTITION_AVAILABILITY: {
+ NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+ queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
+ .getPartitionId(), rpaf.getNetworkAddress()));
+ return;
+ }
+
+ case NODE_REGISTRATION_RESULT: {
+ NodeControllerFunctions.NodeRegistrationResult nrrf = (NodeControllerFunctions.NodeRegistrationResult) fn;
+ setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index 5b60a08..eb982df 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -27,9 +27,9 @@
import org.apache.http.impl.client.DefaultHttpClient;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -45,15 +45,12 @@
private final byte[] serializedDistributedState;
- private final FutureValue<Object> fv;
-
public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
- byte[] serializedDistributedState, FutureValue<Object> fv) {
+ byte[] serializedDistributedState) {
this.ncs = ncs;
this.appName = appName;
this.deployHar = deployHar;
this.serializedDistributedState = serializedDistributedState;
- this.fv = fv;
}
@Override
@@ -85,9 +82,10 @@
appCtx.initializeClassPath();
appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
appCtx.initialize();
- fv.setValue(null);
+ ncs.getClusterController()
+ .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
} catch (Exception e) {
- fv.setException(e);
+ LOGGER.warning("Error creating application: " + e.getMessage());
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
index 8ac0b5c..cfe00f6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -18,7 +18,7 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -30,12 +30,9 @@
private final String appName;
- private FutureValue<Object> fv;
-
- public DestroyApplicationWork(NodeControllerService ncs, String appName, FutureValue<Object> fv) {
+ public DestroyApplicationWork(NodeControllerService ncs, String appName) {
this.ncs = ncs;
this.appName = appName;
- this.fv = fv;
}
@Override
@@ -46,9 +43,9 @@
if (appCtx != null) {
appCtx.deinitialize();
}
- fv.setValue(null);
} catch (Exception e) {
- fv.setException(e);
+ LOGGER.warning("Error destroying application: " + e.getMessage());
}
+ ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.DEINITIALIZED);
}
}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index fac141c..4ed8361 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -116,7 +116,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jobId.toString());
}
- cc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId);
dumpOutputFiles();
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
index a4adf23..8a3630f 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -21,7 +21,7 @@
public interface IIPCHandle {
public InetSocketAddress getRemoteAddress();
- public void send(Object request, IResponseCallback callback) throws IPCException;
+ public long send(long requestId, Object payload, Exception exception) throws IPCException;
public void setAttachment(Object attachment);
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
index ba9f343..24ab943 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
@@ -15,5 +15,5 @@
package edu.uci.ics.hyracks.ipc.api;
public interface IIPCI {
- public Object call(IIPCHandle caller, Object req) throws Exception;
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
new file mode 100644
index 0000000..3340516
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RPCInterface implements IIPCI {
+ private final Map<Long, Request> reqMap;
+
+ public RPCInterface() {
+ reqMap = new HashMap<Long, RPCInterface.Request>();
+ }
+
+ public Object call(IIPCHandle handle, Object request) throws Exception {
+ Request req;
+ synchronized (this) {
+ req = new Request();
+ long mid = handle.send(-1, request, null);
+ reqMap.put(mid, req);
+ }
+ return req.getResponse();
+ }
+
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ Request req;
+ synchronized (this) {
+ req = reqMap.remove(rmid);
+ }
+ assert req != null;
+ if (exception != null) {
+ req.setException(exception);
+ } else {
+ req.setResult(payload);
+ }
+ }
+
+ private static class Request {
+ private boolean pending;
+
+ private Object result;
+
+ private Exception exception;
+
+ Request() {
+ pending = true;
+ result = null;
+ exception = null;
+ }
+
+ synchronized void setResult(Object result) {
+ this.pending = false;
+ this.result = result;
+ notifyAll();
+ }
+
+ synchronized void setException(Exception exception) {
+ this.pending = false;
+ this.exception = exception;
+ notifyAll();
+ }
+
+ synchronized Object getResponse() throws Exception {
+ while (pending) {
+ wait();
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return result;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
deleted file mode 100644
index 180b1dd..0000000
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.ipc.api;
-
-public final class SyncRMI implements IResponseCallback {
- private boolean pending;
-
- private Object response;
-
- private Exception exception;
-
- public SyncRMI() {
- }
-
- @Override
- public synchronized void callback(IIPCHandle handle, Object response, Exception exception) {
- pending = false;
- this.response = response;
- this.exception = exception;
- notifyAll();
- }
-
- public synchronized Object call(IIPCHandle handle, Object request) throws Exception {
- pending = true;
- response = null;
- exception = null;
- handle.send(request, this);
- while (pending) {
- wait();
- }
- if (exception != null) {
- throw exception;
- }
- return response;
- }
-}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 3fb4d60..99ddad5 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -162,7 +162,9 @@
LOGGER.fine("Starting Select");
}
int n = selector.select();
- swapReadersAndWriters();
+ if (pendingConnections[readerIndex].isEmpty() && sendList[readerIndex].isEmpty()) {
+ swapReadersAndWriters();
+ }
if (!pendingConnections[readerIndex].isEmpty()) {
for (IPCHandle handle : pendingConnections[readerIndex]) {
SocketChannel channel = SocketChannel.open();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 481a0b0..f3c05c6 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -17,11 +17,8 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.util.HashMap;
-import java.util.Map;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IResponseCallback;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
final class IPCHandle implements IIPCHandle {
@@ -31,8 +28,6 @@
private InetSocketAddress remoteAddress;
- private final Map<Long, IResponseCallback> pendingRequestMap;
-
private HandleState state;
private SelectionKey key;
@@ -48,7 +43,6 @@
IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
this.system = system;
this.remoteAddress = remoteAddress;
- pendingRequestMap = new HashMap<Long, IResponseCallback>();
inBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
outBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
outBuffer.flip();
@@ -65,19 +59,23 @@
}
@Override
- public synchronized void send(Object req, IResponseCallback callback) throws IPCException {
+ public synchronized long send(long requestId, Object req, Exception exception) throws IPCException {
if (state != HandleState.CONNECTED) {
throw new IPCException("Handle is not in Connected state");
}
Message msg = new Message(this);
long mid = system.createMessageId();
msg.setMessageId(mid);
- msg.setRequestMessageId(-1);
- msg.setPayload(req);
- if (callback != null) {
- pendingRequestMap.put(mid, callback);
+ msg.setRequestMessageId(requestId);
+ if (exception != null) {
+ msg.setFlag(Message.ERROR);
+ msg.setPayload(exception);
+ } else {
+ msg.setFlag(Message.NORMAL);
+ msg.setPayload(req);
}
system.getConnectionManager().write(msg);
+ return mid;
}
@Override
@@ -110,7 +108,7 @@
this.state = state;
notifyAll();
}
-
+
synchronized void waitTillConnected() throws InterruptedException {
while (!isConnected()) {
wait();
@@ -127,9 +125,6 @@
synchronized void close() {
setState(HandleState.CLOSED);
- for (IResponseCallback cb : pendingRequestMap.values()) {
- cb.callback(this, null, new IPCException("IPC Handle Closed"));
- }
}
synchronized void processIncomingMessages() {
@@ -157,19 +152,7 @@
}
continue;
}
- long requestMessageId = message.getRequestMessageId();
- if (requestMessageId < 0) {
- system.deliverIncomingMessage(message);
- } else {
- Long rid = Long.valueOf(requestMessageId);
- IResponseCallback cb = pendingRequestMap.remove(rid);
- if (cb != null) {
- byte flag = message.getFlag();
- Object payload = flag == Message.ERROR ? null : message.getPayload();
- Exception exception = (Exception) (flag == Message.ERROR ? message.getPayload() : null);
- cb.callback(this, payload, exception);
- }
- }
+ system.deliverIncomingMessage(message);
}
inBuffer.compact();
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 9eef8ff..5f60c6d 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -28,18 +27,15 @@
private final IIPCI ipci;
- private final Executor executor;
-
private final AtomicLong midFactory;
public IPCSystem(InetSocketAddress socketAddress) throws IOException {
- this(socketAddress, null, null);
+ this(socketAddress, null);
}
- public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, Executor executor) throws IOException {
+ public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci) throws IOException {
cMgr = new IPCConnectionManager(this, socketAddress);
this.ipci = ipci;
- this.executor = executor;
midFactory = new AtomicLong();
}
@@ -66,25 +62,16 @@
}
void deliverIncomingMessage(final Message message) {
- assert message.getFlag() == Message.NORMAL;
- executor.execute(new Runnable() {
- @Override
- public void run() {
- IPCHandle handle = message.getIPCHandle();
- Message response = new Message(handle);
- response.setMessageId(createMessageId());
- response.setRequestMessageId(message.getMessageId());
- response.setFlag(Message.NORMAL);
- try {
- Object result = ipci.call(handle, message.getPayload());
- response.setPayload(result);
- } catch (Exception e) {
- response.setFlag(Message.ERROR);
- response.setPayload(e);
- }
- cMgr.write(response);
- }
- });
+ long mid = message.getMessageId();
+ long rmid = message.getRequestMessageId();
+ Object payload = null;
+ Exception exception = null;
+ if (message.getFlag() == Message.ERROR) {
+ exception = (Exception) message.getPayload();
+ } else {
+ payload = message.getPayload();
+ }
+ ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, payload, exception);
}
IPCConnectionManager getConnectionManager() {
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index 24cea1d..dac93dd 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -25,7 +25,8 @@
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
public class IPCTest {
@@ -35,20 +36,18 @@
server.start();
InetSocketAddress serverAddr = server.getSocketAddress();
- IPCSystem client = createClientIPCSystem();
+ RPCInterface rpci = new RPCInterface();
+ IPCSystem client = createClientIPCSystem(rpci);
client.start();
IIPCHandle handle = client.getHandle(serverAddr);
- SyncRMI rmi = new SyncRMI();
for (int i = 0; i < 100; ++i) {
- Assert.assertEquals(rmi.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+ Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
}
- IIPCHandle rHandle = server.getHandle(client.getSocketAddress());
-
try {
- rmi.call(rHandle, "Foo");
+ rpci.call(handle, "Foo");
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertTrue(true);
@@ -56,25 +55,35 @@
}
private IPCSystem createServerIPCSystem() throws IOException {
- Executor executor = Executors.newCachedThreadPool();
+ final Executor executor = Executors.newCachedThreadPool();
IIPCI ipci = new IIPCI() {
@Override
- public Object call(IIPCHandle caller, Object req) throws Exception {
- Integer i = (Integer) req;
- return i.intValue() * 2;
+ public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid,
+ final Object payload, Exception exception) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ Object result = null;
+ Exception exception = null;
+ try {
+ Integer i = (Integer) payload;
+ result = i.intValue() * 2;
+ } catch (Exception e) {
+ exception = e;
+ }
+ try {
+ handle.send(mid, result, exception);
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ }
+ });
}
};
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci);
}
- private IPCSystem createClientIPCSystem() throws IOException {
- Executor executor = Executors.newCachedThreadPool();
- IIPCI ipci = new IIPCI() {
- @Override
- public Object call(IIPCHandle caller, Object req) throws Exception {
- throw new IllegalStateException();
- }
- };
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+ private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci);
}
}
\ No newline at end of file