Added state awareness of partitions to CC
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@458 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 56002a8..7de7ab8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -17,7 +17,6 @@
import java.io.File;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Hashtable;
@@ -73,6 +72,7 @@
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -296,13 +296,13 @@
}
@Override
- public void registerPartitionProvider(PartitionId pid, String nodeId) throws Exception {
- jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, nodeId));
+ public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception {
+ jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, nodeId, state));
}
@Override
- public void registerPartitionRequest(Collection<PartitionId> requiredPartitionIds, String nodeId) {
- jobQueue.schedule(new RegisterPartitionRequestEvent(this, requiredPartitionIds, nodeId));
+ public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) {
+ jobQueue.schedule(new RegisterPartitionRequestEvent(this, pid, nodeId, minState));
}
private class DeadNodeSweeper extends TimerTask {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 6c851cc..acf6750 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -26,6 +26,7 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.cc.scheduler.IJobRunStateMachine;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
public class JobRun implements IJobStatusConditionVariable {
@@ -33,9 +34,9 @@
private final JobActivityGraph jag;
- private final Map<PartitionId, Set<String>> partitionAvailabilityMap;
+ private final Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap;
- private final Map<PartitionId, Set<String>> partitionRequestorMap;
+ private final Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap;
private final Set<String> participatingNodeIds;
@@ -52,8 +53,8 @@
public JobRun(UUID jobId, JobActivityGraph plan) {
this.jobId = jobId;
this.jag = plan;
- partitionAvailabilityMap = new HashMap<PartitionId, Set<String>>();
- partitionRequestorMap = new HashMap<PartitionId, Set<String>>();
+ partitionAvailabilityMap = new HashMap<PartitionId, Map<String, PartitionState>>();
+ partitionRequestorMap = new HashMap<PartitionId, Map<String, PartitionState>>();
participatingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
@@ -107,11 +108,11 @@
return jsm;
}
- public Map<PartitionId, Set<String>> getPartitionAvailabilityMap() {
+ public Map<PartitionId, Map<String, PartitionState>> getPartitionAvailabilityMap() {
return partitionAvailabilityMap;
}
- public Map<PartitionId, Set<String>> getPartitionRequestorMap() {
+ public Map<PartitionId, Map<String, PartitionState>> getPartitionRequestorMap() {
return partitionRequestorMap;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index 1bcc13b..8a35cf1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -24,16 +24,20 @@
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
public class RegisterPartitionAvailibilityEvent extends AbstractEvent {
private final ClusterControllerService ccs;
private final PartitionId pid;
private final String nodeId;
+ private final PartitionState state;
- public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid, String nodeId) {
+ public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
+ PartitionState state) {
this.ccs = ccs;
this.pid = pid;
this.nodeId = nodeId;
+ this.state = state;
}
@Override
@@ -42,26 +46,32 @@
if (run == null) {
return;
}
- Map<PartitionId, Set<String>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
- Set<String> paSet = partitionAvailabilityMap.get(pid);
- if (paSet == null) {
- paSet = new HashSet<String>();
- partitionAvailabilityMap.put(pid, paSet);
+ Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
+ Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
+ if (paMap == null) {
+ paMap = new HashMap<String, PartitionState>();
+ partitionAvailabilityMap.put(pid, paMap);
}
- paSet.add(nodeId);
+ paMap.put(nodeId, state);
NodeControllerState availNcs = ccs.getNodeMap().get(nodeId);
- Map<PartitionId, Set<String>> partitionRequestorMap = run.getPartitionRequestorMap();
- Set<String> prSet = partitionRequestorMap.get(pid);
- if (prSet != null) {
- for (String requestor : prSet) {
- NodeControllerState ncs = ccs.getNodeMap().get(requestor);
- if (ncs != null) {
- try {
- INodeController nc = ncs.getNodeController();
- nc.reportPartitionAvailability(pid, availNcs.getDataPort());
- } catch (Exception e) {
- e.printStackTrace();
+ Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
+ Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
+ if (prMap != null) {
+ Iterator<Map.Entry<String, PartitionState>> i = prMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<String, PartitionState> requestor = i.next();
+ PartitionState minState = requestor.getValue();
+ if (state.isAtLeast(minState)) {
+ i.remove();
+ NodeControllerState ncs = ccs.getNodeMap().get(requestor.getKey());
+ if (ncs != null) {
+ try {
+ INodeController nc = ncs.getNodeController();
+ nc.reportPartitionAvailability(pid, availNcs.getDataPort());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
}
@@ -70,6 +80,6 @@
@Override
public String toString() {
- return "PartitionAvailable@" + nodeId + "[" + pid + "]";
+ return "PartitionAvailable@" + nodeId + "[" + pid + "]" + state;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index e578f77..2f2adc0 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -14,10 +14,8 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -26,62 +24,67 @@
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
public class RegisterPartitionRequestEvent extends AbstractEvent {
private final ClusterControllerService ccs;
- private final Collection<PartitionId> requiredPartitionIds;
+ private final PartitionId pid;
private final String nodeId;
+ private final PartitionState minState;
- public RegisterPartitionRequestEvent(ClusterControllerService ccs, Collection<PartitionId> requiredPartitionIds,
- String nodeId) {
+ public RegisterPartitionRequestEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
+ PartitionState minState) {
this.ccs = ccs;
- this.requiredPartitionIds = requiredPartitionIds;
+ this.pid = pid;
this.nodeId = nodeId;
+ this.minState = minState;
}
@Override
public void run() {
NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
if (ncs != null) {
- for (PartitionId pid : requiredPartitionIds) {
- JobRun run = ccs.getRunMap().get(pid.getJobId());
- if (run == null) {
- return;
- }
+ JobRun run = ccs.getRunMap().get(pid.getJobId());
+ if (run == null) {
+ return;
+ }
- Map<PartitionId, Set<String>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
- Set<String> paSet = partitionAvailabilityMap.get(pid);
- boolean matched = false;
- if (paSet != null && !paSet.isEmpty()) {
- for (String availablePartitionLocation : paSet) {
- NodeControllerState availNcs = ccs.getNodeMap().get(availablePartitionLocation);
+ Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
+ Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
+ boolean matched = false;
+ if (paMap != null && !paMap.isEmpty()) {
+ for (Map.Entry<String, PartitionState> pa : paMap.entrySet()) {
+ PartitionState state = pa.getValue();
+ if (state.isAtLeast(minState)) {
+ NodeControllerState availNcs = ccs.getNodeMap().get(pa.getKey());
if (availNcs != null) {
NetworkAddress networkAddress = availNcs.getDataPort();
try {
INodeController nc = ncs.getNodeController();
nc.reportPartitionAvailability(pid, networkAddress);
matched = true;
+ break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
- if (!matched) {
- Map<PartitionId, Set<String>> partitionRequestorMap = run.getPartitionRequestorMap();
- Set<String> prSet = partitionRequestorMap.get(pid);
- if (prSet == null) {
- prSet = new HashSet<String>();
- partitionRequestorMap.put(pid, prSet);
- }
- prSet.add(nodeId);
+ }
+ if (!matched) {
+ Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
+ Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
+ if (prMap == null) {
+ prMap = new HashMap<String, PartitionState>();
+ partitionRequestorMap.put(pid, prMap);
}
+ prMap.put(nodeId, minState);
}
}
}
@Override
public String toString() {
- return "PartitionRequest@[" + nodeId + "][" + requiredPartitionIds + "]";
+ return "PartitionRequest@[" + nodeId + "][" + pid + "]" + minState;
}
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 9791161..8dbb180 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.control.common.base;
import java.rmi.Remote;
-import java.util.Collection;
import java.util.List;
import java.util.UUID;
@@ -23,6 +22,7 @@
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -40,7 +40,7 @@
public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
- public void registerPartitionProvider(PartitionId pid, String nodeId) throws Exception;
+ public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception;
- public void registerPartitionRequest(Collection<PartitionId> requiredPartitionIds, String nodeId) throws Exception;
+ public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
new file mode 100644
index 0000000..412cbac
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.control.common.job;
+
+public enum PartitionState {
+ STARTED,
+ COMMITTED;
+
+ public boolean isAtLeast(PartitionState minState) {
+ switch (this) {
+ case COMMITTED:
+ return true;
+
+ case STARTED:
+ return minState == STARTED;
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 30434b8..9c12cd9 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -212,12 +213,12 @@
return counter;
}
- public synchronized void advertisePartitionRequest(Collection<PartitionId> requiredPartitionIds,
- IPartitionCollector collector) throws Exception {
- for (PartitionId pid : requiredPartitionIds) {
+ public synchronized void advertisePartitionRequest(Collection<PartitionId> pids, IPartitionCollector collector,
+ PartitionState minState) throws Exception {
+ for (PartitionId pid : pids) {
partitionRequestMap.put(pid, collector);
+ nodeController.getClusterController().registerPartitionRequest(pid, nodeController.getId(), minState);
}
- nodeController.getClusterController().registerPartitionRequest(requiredPartitionIds, nodeController.getId());
}
public synchronized void reportPartitionAvailability(PartitionChannel channel) throws HyracksException {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index d5ce485..20bf9da 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -206,7 +207,7 @@
try {
collector.open();
try {
- joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector);
+ joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
IFrameReader reader = collector.getReader();
reader.open();
try {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index a74f099..353ba32 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -70,5 +70,6 @@
public void close() throws HyracksDataException {
ctx.getIOManager().close(handle);
manager.registerPartition(pid, new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()));
+ manager.notifyPartitionCommit(pid);
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index e5e23bb..b676e33 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -58,7 +59,15 @@
pList.add(partition);
}
try {
- ncs.getClusterController().registerPartitionProvider(pid, ncs.getId());
+ ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.STARTED);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public void notifyPartitionCommit(PartitionId pid) throws HyracksDataException {
+ try {
+ ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.COMMITTED);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index c667ef4..ec6e25d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -74,6 +74,7 @@
@Override
public void close() throws HyracksDataException {
+ manager.notifyPartitionCommit(pid);
delegate.close();
}
}
\ No newline at end of file