Reverted non-blocking CC calls
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1022 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 7039532..9a443d9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -59,7 +59,6 @@
import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
import edu.uci.ics.hyracks.control.cc.work.JobCleanupWork;
-import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
@@ -183,9 +182,6 @@
}
private void startRunnableActivityClusters() throws HyracksException {
- if (jobRun.getStatus() == JobStatus.FAILURE || jobRun.getStatus() == JobStatus.TERMINATED) {
- return;
- }
Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
if (LOGGER.isLoggable(Level.FINE)) {
@@ -681,8 +677,4 @@
abortJob(e);
}
}
-
- public void notifyPartitionAvailability(PartitionDescriptor descriptor) throws HyracksException {
- startRunnableActivityClusters();
- }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index c0b47a5..2a844ad 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -18,7 +18,6 @@
import org.apache.commons.lang3.tuple.Pair;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -54,11 +53,6 @@
e.printStackTrace();
}
}
- try {
- run.getScheduler().notifyPartitionAvailability(partitionDescriptor);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
}
@Override
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index cb5298e..0aeab72 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -46,58 +46,66 @@
@Override
public void unregisterNode(String nodeId) throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
nodeId);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
jobId, taskId, nodeId, statistics);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
jobId, taskId, nodeId, details);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
jobId, nodeId);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
hbData);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
profiles);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
partitionDescriptor);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+ SyncRMI sync = new SyncRMI();
ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
partitionRequest);
- ipcHandle.send(fn, null);
+ sync.call(ipcHandle, fn);
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index dce7c13..7bebb53 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -50,6 +50,15 @@
if (joblet != null) {
joblet.cleanup(status);
}
- ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+ ncs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
}
\ No newline at end of file