Reverted r556 and r555

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@958 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index e83d496..e2c9ca3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -23,6 +23,7 @@
 import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -430,25 +431,31 @@
     }
 
     private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
+        Executor executor = ccs.getExecutor();
         final JobId jobId = jobRun.getJobId();
         final JobActivityGraph jag = jobRun.getJobActivityGraph();
         final String appName = jag.getApplicationName();
         final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
-        for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
-            String nodeId = entry.getKey();
-            final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
+        for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
+            String nodeId = e.getKey();
+            final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
             final NodeControllerState node = ccs.getNodeMap().get(nodeId);
             if (node != null) {
                 node.getActiveJobIds().add(jobRun.getJobId());
                 jobRun.getParticipatingNodeIds().add(nodeId);
-                try {
-                    node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                            taskDescriptors, connectorPolicies);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
+                                    taskDescriptors, connectorPolicies);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
             }
         }
     }
@@ -484,16 +491,21 @@
         }
         final JobId jobId = jobRun.getJobId();
         LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
-        for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
-            final NodeControllerState node = ccs.getNodeMap().get(entry.getKey());
-            final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
+        for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
+            final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
+            final List<TaskAttemptId> abortTaskAttempts = e.getValue();
             if (node != null) {
-                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
-                try {
-                    node.getNodeController().abortTasks(jobId, abortTaskAttempts);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
+                LOGGER.info("Aborting: " + abortTaskAttempts + " at " + e.getKey());
+                ccs.getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            node.getNodeController().abortTasks(jobId, abortTaskAttempts);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
             }
         }
         inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index bc6c69a..0aeab72 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -55,50 +55,57 @@
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
+        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
                 jobId, taskId, nodeId, statistics);
-        ipcHandle.send(fn, null);
+        sync.call(ipcHandle, fn);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
                 jobId, taskId, nodeId, details);
-        ipcHandle.send(fn, null);
+        sync.call(ipcHandle, fn);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
                 jobId, nodeId);
-        ipcHandle.send(fn, null);
+        sync.call(ipcHandle, fn);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
                 hbData);
-        ipcHandle.send(fn, null);
+        sync.call(ipcHandle, fn);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
                 profiles);
-        ipcHandle.send(fn, null);
+        sync.call(ipcHandle, fn);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        ipcHandle.send(fn, null);
+        sync.call(ipcHandle, fn);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
                 partitionRequest);
-        ipcHandle.send(fn, null);
+        sync.call(ipcHandle, fn);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index cac6ef5..ccd9468 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -39,15 +39,17 @@
     @Override
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
                 planBytes, taskDescriptors, connectorPolicies);
-        ipcHandle.send(stf, null);
+        sync.call(ipcHandle, stf);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
-        ipcHandle.send(atf, null);
+        sync.call(ipcHandle, atf);
     }
 
     @Override
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index dce7c13..7bebb53 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,6 +50,15 @@
         if (joblet != null) {
             joblet.cleanup(status);
         }
-        ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+        ncs.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
     }
 }
\ No newline at end of file