Turned RMI calls on NC side to be non-blocking
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@956 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0aeab72..bc6c69a 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -55,57 +55,50 @@
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
jobId, taskId, nodeId, statistics);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(fn, null);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
jobId, taskId, nodeId, details);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
jobId, nodeId);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
hbData);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
profiles);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(fn, null);
}
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
partitionDescriptor);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(fn, null);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
- SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
partitionRequest);
- sync.call(ipcHandle, fn);
+ ipcHandle.send(fn, null);
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index 7bebb53..dce7c13 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,15 +50,6 @@
if (joblet != null) {
joblet.cleanup(status);
}
- ncs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
+ ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
}
}
\ No newline at end of file