Turned more RMI calls to be non-blocking

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@955 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index e2c9ca3..e83d496 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -23,7 +23,6 @@
 import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -431,31 +430,25 @@
     }
 
     private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
-        Executor executor = ccs.getExecutor();
         final JobId jobId = jobRun.getJobId();
         final JobActivityGraph jag = jobRun.getJobActivityGraph();
         final String appName = jag.getApplicationName();
         final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
-        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
-            String nodeId = e.getKey();
-            final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
+        for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
+            String nodeId = entry.getKey();
+            final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
             final NodeControllerState node = ccs.getNodeMap().get(nodeId);
             if (node != null) {
                 node.getActiveJobIds().add(jobRun.getJobId());
                 jobRun.getParticipatingNodeIds().add(nodeId);
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                                    taskDescriptors, connectorPolicies);
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
+                try {
+                    node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
+                            taskDescriptors, connectorPolicies);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
             }
         }
     }
@@ -491,21 +484,16 @@
         }
         final JobId jobId = jobRun.getJobId();
         LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
-        for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
-            final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
-            final List<TaskAttemptId> abortTaskAttempts = e.getValue();
+        for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
+            final NodeControllerState node = ccs.getNodeMap().get(entry.getKey());
+            final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
             if (node != null) {
-                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
-                ccs.getExecutor().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            node.getNodeController().abortTasks(jobId, abortTaskAttempts);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
+                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
+                try {
+                    node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
             }
         }
         inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index ccd9468..cac6ef5 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -39,17 +39,15 @@
     @Override
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
-        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
                 planBytes, taskDescriptors, connectorPolicies);
-        sync.call(ipcHandle, stf);
+        ipcHandle.send(stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
-        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
-        sync.call(ipcHandle, atf);
+        ipcHandle.send(atf, null);
     }
 
     @Override