Turned more RMI calls to be non-blocking
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@955 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index e2c9ca3..e83d496 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -23,7 +23,6 @@
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -431,31 +430,25 @@
}
private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
- Executor executor = ccs.getExecutor();
final JobId jobId = jobRun.getJobId();
final JobActivityGraph jag = jobRun.getJobActivityGraph();
final String appName = jag.getApplicationName();
final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
- for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
- String nodeId = e.getKey();
- final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
+ for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
+ String nodeId = entry.getKey();
+ final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
final NodeControllerState node = ccs.getNodeMap().get(nodeId);
if (node != null) {
node.getActiveJobIds().add(jobRun.getJobId());
jobRun.getParticipatingNodeIds().add(nodeId);
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
- taskDescriptors, connectorPolicies);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
+ try {
+ node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
+ taskDescriptors, connectorPolicies);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
}
@@ -491,21 +484,16 @@
}
final JobId jobId = jobRun.getJobId();
LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
- for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
- final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
- final List<TaskAttemptId> abortTaskAttempts = e.getValue();
+ for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
+ final NodeControllerState node = ccs.getNodeMap().get(entry.getKey());
+ final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
if (node != null) {
- LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- node.getNodeController().abortTasks(jobId, abortTaskAttempts);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
+ LOGGER.info("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
+ try {
+ node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index ccd9468..cac6ef5 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -39,17 +39,15 @@
@Override
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
- SyncRMI sync = new SyncRMI();
NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
planBytes, taskDescriptors, connectorPolicies);
- sync.call(ipcHandle, stf);
+ ipcHandle.send(stf, null);
}
@Override
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
- SyncRMI sync = new SyncRMI();
NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
- sync.call(ipcHandle, atf);
+ ipcHandle.send(atf, null);
}
@Override