Changed a bunch of CC communication to be non-blocking

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1020 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 9a443d9..7039532 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -59,6 +59,7 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
 import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
 import edu.uci.ics.hyracks.control.cc.work.JobCleanupWork;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
@@ -182,6 +183,9 @@
     }
 
     private void startRunnableActivityClusters() throws HyracksException {
+        if (jobRun.getStatus() == JobStatus.FAILURE || jobRun.getStatus() == JobStatus.TERMINATED) {
+            return;
+        }
         Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
         findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
         if (LOGGER.isLoggable(Level.FINE)) {
@@ -677,4 +681,8 @@
             abortJob(e);
         }
     }
+
+    public void notifyPartitionAvailability(PartitionDescriptor descriptor) throws HyracksException {
+        startRunnableActivityClusters();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2a844ad..c0b47a5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -18,6 +18,7 @@
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -53,6 +54,11 @@
                 e.printStackTrace();
             }
         }
+        try {
+            run.getScheduler().notifyPartitionAvailability(partitionDescriptor);
+        } catch (HyracksException e) {
+            e.printStackTrace();
+        }
     }
 
     @Override
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0aeab72..cb5298e 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -46,66 +46,58 @@
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
                 nodeId);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
                 jobId, taskId, nodeId, statistics);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
                 jobId, taskId, nodeId, details);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
                 jobId, nodeId);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
                 hbData);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
                 profiles);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
                 partitionRequest);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(fn, null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index 7bebb53..dce7c13 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,15 +50,6 @@
         if (joblet != null) {
             joblet.cleanup(status);
         }
-        ncs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
+        ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
     }
 }
\ No newline at end of file