Cleaned up partition management. Made fault recovery more robust.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@465 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
similarity index 92%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
index ffd3722..a5017de 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.api.dataflow.connectors;
-public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
similarity index 90%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
index 0a02e58..82608c4 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.api.dataflow.connectors;
-public final class SendSideMaterializedReceiveSideMaterializedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 7de7ab8..79d709e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,7 +40,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -72,7 +71,8 @@
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -296,13 +296,13 @@
}
@Override
- public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception {
- jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, pid, nodeId, state));
+ public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) {
+ jobQueue.schedule(new RegisterPartitionAvailibilityEvent(this, partitionDescriptor));
}
@Override
- public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) {
- jobQueue.schedule(new RegisterPartitionRequestEvent(this, pid, nodeId, minState));
+ public void registerPartitionRequest(PartitionRequest partitionRequest) {
+ jobQueue.schedule(new RegisterPartitionRequestEvent(this, partitionRequest));
}
private class DeadNodeSweeper extends TimerTask {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index c802eb8..edc8579 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -117,4 +117,8 @@
public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
return connectorPolicies;
}
+
+ public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+ acsm.notifyNodeFailures(deadNodes);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index acf6750..40f4ae5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -24,9 +24,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
import edu.uci.ics.hyracks.control.cc.scheduler.IJobRunStateMachine;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
public class JobRun implements IJobStatusConditionVariable {
@@ -34,9 +33,7 @@
private final JobActivityGraph jag;
- private final Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap;
-
- private final Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap;
+ private final PartitionMatchMaker pmm;
private final Set<String> participatingNodeIds;
@@ -53,8 +50,7 @@
public JobRun(UUID jobId, JobActivityGraph plan) {
this.jobId = jobId;
this.jag = plan;
- partitionAvailabilityMap = new HashMap<PartitionId, Map<String, PartitionState>>();
- partitionRequestorMap = new HashMap<PartitionId, Map<String, PartitionState>>();
+ pmm = new PartitionMatchMaker();
participatingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
@@ -68,6 +64,10 @@
return jag;
}
+ public PartitionMatchMaker getPartitionMatchMaker() {
+ return pmm;
+ }
+
public synchronized void setStatus(JobStatus status, Exception exception) {
this.status = status;
this.exception = exception;
@@ -108,15 +108,11 @@
return jsm;
}
- public Map<PartitionId, Map<String, PartitionState>> getPartitionAvailabilityMap() {
- return partitionAvailabilityMap;
- }
-
- public Map<PartitionId, Map<String, PartitionState>> getPartitionRequestorMap() {
- return partitionRequestorMap;
- }
-
public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
return activityClusterMap;
}
+
+ public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+ jsm.notifyNodeFailures(deadNodes);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index b82dafd..a0b2b43 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.cc.job;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -31,10 +32,6 @@
private final Set<PartitionId> requiredPartitions;
- private final Set<TaskCluster> blockers;
-
- private final Set<TaskCluster> dependencies;
-
private final List<TaskClusterAttempt> taskClusterAttempts;
public TaskCluster(ActivityCluster activityCluster, Task[] tasks) {
@@ -42,8 +39,6 @@
this.tasks = tasks;
this.producedPartitions = new HashSet<PartitionId>();
this.requiredPartitions = new HashSet<PartitionId>();
- this.blockers = new HashSet<TaskCluster>();
- this.dependencies = new HashSet<TaskCluster>();
taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
}
@@ -51,10 +46,6 @@
return tasks;
}
- public Set<TaskCluster> getDependencies() {
- return dependencies;
- }
-
public Set<PartitionId> getProducedPartitions() {
return producedPartitions;
}
@@ -63,10 +54,6 @@
return requiredPartitions;
}
- public Set<TaskCluster> getBlockers() {
- return blockers;
- }
-
public List<TaskClusterAttempt> getAttempts() {
return taskClusterAttempts;
}
@@ -78,4 +65,9 @@
public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
activityCluster.getStateMachine().notifyTaskFailure(ta, exception);
}
+
+ @Override
+ public String toString() {
+ return "TC:" + Arrays.toString(tasks);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index f59e9cc..d1829e9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
@@ -47,6 +48,10 @@
int i = 0;
for (String n : targetNodes) {
jcns[i++] = new JobCompleteNotifier(n, jobId);
+ NodeControllerState ncs = ccs.getNodeMap().get(n);
+ if (ncs != null) {
+ ncs.getActiveJobIds().remove(jobId);
+ }
}
ccs.getExecutor().execute(new Runnable() {
@Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index 8a35cf1..3f49b97 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -14,72 +14,44 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.List;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionUtils;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
public class RegisterPartitionAvailibilityEvent extends AbstractEvent {
private final ClusterControllerService ccs;
- private final PartitionId pid;
- private final String nodeId;
- private final PartitionState state;
+ private final PartitionDescriptor partitionDescriptor;
- public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
- PartitionState state) {
+ public RegisterPartitionAvailibilityEvent(ClusterControllerService ccs, PartitionDescriptor partitionDescriptor) {
this.ccs = ccs;
- this.pid = pid;
- this.nodeId = nodeId;
- this.state = state;
+ this.partitionDescriptor = partitionDescriptor;
}
@Override
public void run() {
+ final PartitionId pid = partitionDescriptor.getPartitionId();
JobRun run = ccs.getRunMap().get(pid.getJobId());
if (run == null) {
return;
}
- Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
- Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
- if (paMap == null) {
- paMap = new HashMap<String, PartitionState>();
- partitionAvailabilityMap.put(pid, paMap);
- }
- paMap.put(nodeId, state);
-
- NodeControllerState availNcs = ccs.getNodeMap().get(nodeId);
- Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
- Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
- if (prMap != null) {
- Iterator<Map.Entry<String, PartitionState>> i = prMap.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<String, PartitionState> requestor = i.next();
- PartitionState minState = requestor.getValue();
- if (state.isAtLeast(minState)) {
- i.remove();
- NodeControllerState ncs = ccs.getNodeMap().get(requestor.getKey());
- if (ncs != null) {
- try {
- INodeController nc = ncs.getNodeController();
- nc.reportPartitionAvailability(pid, availNcs.getDataPort());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
+ PartitionMatchMaker pmm = run.getPartitionMatchMaker();
+ List<Pair<PartitionDescriptor, PartitionRequest>> matches = pmm
+ .registerPartitionDescriptor(partitionDescriptor);
+ for (Pair<PartitionDescriptor, PartitionRequest> match : matches) {
+ PartitionUtils.reportPartitionMatch(ccs, pid, match);
}
}
@Override
public String toString() {
- return "PartitionAvailable@" + nodeId + "[" + pid + "]" + state;
+ return "PartitionAvailable@" + partitionDescriptor;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index 2f2adc0..bac8dd1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -14,77 +14,41 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionUtils;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
public class RegisterPartitionRequestEvent extends AbstractEvent {
private final ClusterControllerService ccs;
- private final PartitionId pid;
- private final String nodeId;
- private final PartitionState minState;
+ private final PartitionRequest partitionRequest;
- public RegisterPartitionRequestEvent(ClusterControllerService ccs, PartitionId pid, String nodeId,
- PartitionState minState) {
+ public RegisterPartitionRequestEvent(ClusterControllerService ccs, PartitionRequest partitionRequest) {
this.ccs = ccs;
- this.pid = pid;
- this.nodeId = nodeId;
- this.minState = minState;
+ this.partitionRequest = partitionRequest;
}
@Override
public void run() {
- NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
- if (ncs != null) {
- JobRun run = ccs.getRunMap().get(pid.getJobId());
- if (run == null) {
- return;
- }
-
- Map<PartitionId, Map<String, PartitionState>> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
- Map<String, PartitionState> paMap = partitionAvailabilityMap.get(pid);
- boolean matched = false;
- if (paMap != null && !paMap.isEmpty()) {
- for (Map.Entry<String, PartitionState> pa : paMap.entrySet()) {
- PartitionState state = pa.getValue();
- if (state.isAtLeast(minState)) {
- NodeControllerState availNcs = ccs.getNodeMap().get(pa.getKey());
- if (availNcs != null) {
- NetworkAddress networkAddress = availNcs.getDataPort();
- try {
- INodeController nc = ncs.getNodeController();
- nc.reportPartitionAvailability(pid, networkAddress);
- matched = true;
- break;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- if (!matched) {
- Map<PartitionId, Map<String, PartitionState>> partitionRequestorMap = run.getPartitionRequestorMap();
- Map<String, PartitionState> prMap = partitionRequestorMap.get(pid);
- if (prMap == null) {
- prMap = new HashMap<String, PartitionState>();
- partitionRequestorMap.put(pid, prMap);
- }
- prMap.put(nodeId, minState);
- }
+ PartitionId pid = partitionRequest.getPartitionId();
+ JobRun run = ccs.getRunMap().get(pid.getJobId());
+ if (run == null) {
+ return;
+ }
+ PartitionMatchMaker pmm = run.getPartitionMatchMaker();
+ Pair<PartitionDescriptor, PartitionRequest> match = pmm.matchPartitionRequest(partitionRequest);
+ if (match != null) {
+ PartitionUtils.reportPartitionMatch(ccs, pid, match);
}
}
@Override
public String toString() {
- return "PartitionRequest@[" + nodeId + "][" + pid + "]" + minState;
+ return "PartitionRequest@" + partitionRequest;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 6e000cc..27322f5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -17,11 +17,14 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.jobqueue.AbstractEvent;
public class RemoveDeadNodesEvent extends AbstractEvent {
@@ -44,10 +47,14 @@
LOGGER.info(e.getKey() + " considered dead");
}
}
+ Set<UUID> affectedJobIds = new HashSet<UUID>();
Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
for (String deadNode : deadNodes) {
NodeControllerState state = nodeMap.remove(deadNode);
+
// Deal with dead tasks.
+ affectedJobIds.addAll(state.getActiveJobIds());
+
String ipAddress = state.getNCConfig().dataIPAddress;
Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
if (ipNodes != null) {
@@ -56,6 +63,19 @@
}
}
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Number of affected jobs: " + affectedJobIds.size());
+ }
+ for (UUID jobId : affectedJobIds) {
+ JobRun run = ccs.getRunMap().get(jobId);
+ if (run != null) {
+ try {
+ run.notifyNodeFailures(deadNodes);
+ } catch (HyracksException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
@Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
new file mode 100644
index 0000000..1bfb69e
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -0,0 +1,187 @@
+package edu.uci.ics.hyracks.control.cc.partitions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+
+public class PartitionMatchMaker {
+ private final Map<PartitionId, List<PartitionDescriptor>> partitionDescriptors;
+
+ private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
+
+ public PartitionMatchMaker() {
+ partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>();
+ partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
+ }
+
+ public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor(
+ PartitionDescriptor partitionDescriptor) {
+ List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
+ PartitionId pid = partitionDescriptor.getPartitionId();
+ boolean matched = false;
+ List<PartitionRequest> requests = partitionRequests.get(pid);
+ if (requests != null) {
+ Iterator<PartitionRequest> i = requests.iterator();
+ while (i.hasNext()) {
+ PartitionRequest req = i.next();
+ if (partitionDescriptor.getState().isAtLeast(req.getMinimumState())) {
+ matches.add(new Pair<PartitionDescriptor, PartitionRequest>(partitionDescriptor, req));
+ i.remove();
+ matched = true;
+ if (!partitionDescriptor.isReusable()) {
+ break;
+ }
+ }
+ }
+ if (requests.isEmpty()) {
+ partitionRequests.remove(pid);
+ }
+ }
+
+ if (!matched) {
+ List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+ if (descriptors == null) {
+ descriptors = new ArrayList<PartitionDescriptor>();
+ partitionDescriptors.put(pid, descriptors);
+ }
+ descriptors.add(partitionDescriptor);
+ }
+
+ return matches;
+ }
+
+ public Pair<PartitionDescriptor, PartitionRequest> matchPartitionRequest(PartitionRequest partitionRequest) {
+ Pair<PartitionDescriptor, PartitionRequest> match = null;
+
+ PartitionId pid = partitionRequest.getPartitionId();
+
+ List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+ if (descriptors != null) {
+ Iterator<PartitionDescriptor> i = descriptors.iterator();
+ while (i.hasNext()) {
+ PartitionDescriptor descriptor = i.next();
+ if (descriptor.getState().isAtLeast(partitionRequest.getMinimumState())) {
+ match = new Pair<PartitionDescriptor, PartitionRequest>(descriptor, partitionRequest);
+ if (!descriptor.isReusable()) {
+ i.remove();
+ }
+ break;
+ }
+ }
+ if (descriptors.isEmpty()) {
+ partitionDescriptors.remove(pid);
+ }
+ }
+
+ if (match == null) {
+ List<PartitionRequest> requests = partitionRequests.get(pid);
+ if (requests == null) {
+ requests = new ArrayList<PartitionRequest>();
+ partitionRequests.put(pid, requests);
+ }
+ requests.add(partitionRequest);
+ }
+
+ return match;
+ }
+
+ public PartitionState getMaximumAvailableState(PartitionId pid) {
+ List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+ if (descriptors == null) {
+ return null;
+ }
+ for (PartitionDescriptor descriptor : descriptors) {
+ if (descriptor.getState() == PartitionState.COMMITTED) {
+ return PartitionState.COMMITTED;
+ }
+ }
+ return PartitionState.STARTED;
+ }
+
+ private interface IEntryFilter<T> {
+ public boolean matches(T o);
+ }
+
+ private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) {
+ Iterator<T> j = list.iterator();
+ while (j.hasNext()) {
+ T o = j.next();
+ if (filter.matches(o)) {
+ j.remove();
+ }
+ }
+ }
+
+ private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) {
+ Iterator<Map.Entry<PartitionId, List<T>>> i = map.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<PartitionId, List<T>> e = i.next();
+ List<T> list = e.getValue();
+ removeEntries(list, filter);
+ if (list.isEmpty()) {
+ i.remove();
+ }
+ }
+ }
+
+ public void notifyNodeFailures(final Set<String> deadNodes) {
+ removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
+ @Override
+ public boolean matches(PartitionDescriptor o) {
+ return deadNodes.contains(o.getNodeId());
+ }
+ });
+ removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
+ @Override
+ public boolean matches(PartitionRequest o) {
+ return deadNodes.contains(o.getNodeId());
+ }
+ });
+ }
+
+ public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+ IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
+ @Override
+ public boolean matches(PartitionDescriptor o) {
+ return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
+ }
+ };
+ for (PartitionId pid : partitionIds) {
+ List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
+ if (descriptors != null) {
+ removeEntries(descriptors, filter);
+ if (descriptors.isEmpty()) {
+ partitionDescriptors.remove(pid);
+ }
+ }
+ }
+ }
+
+ public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+ IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
+ @Override
+ public boolean matches(PartitionRequest o) {
+ return taIds.contains(o.getRequestingTaskAttemptId());
+ }
+ };
+ for (PartitionId pid : partitionIds) {
+ List<PartitionRequest> requests = partitionRequests.get(pid);
+ if (requests != null) {
+ removeEntries(requests, filter);
+ if (requests.isEmpty()) {
+ partitionRequests.remove(pid);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionUtils.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionUtils.java
new file mode 100644
index 0000000..90c6ad1
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionUtils.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.cc.partitions;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+
+public class PartitionUtils {
+ public static void reportPartitionMatch(ClusterControllerService ccs, final PartitionId pid,
+ Pair<PartitionDescriptor, PartitionRequest> match) {
+ PartitionDescriptor desc = match.first;
+ PartitionRequest req = match.second;
+
+ NodeControllerState producerNCS = ccs.getNodeMap().get(desc.getNodeId());
+ NodeControllerState requestorNCS = ccs.getNodeMap().get(req.getNodeId());
+ final NetworkAddress dataport = producerNCS.getDataPort();
+ final INodeController requestorNC = requestorNCS.getNodeController();
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ requestorNC.reportPartitionAvailability(pid, dataport);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index 15c2db7..8d2040e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -16,11 +16,12 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -37,6 +38,7 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -46,6 +48,8 @@
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
+import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
@@ -57,6 +61,8 @@
private final ActivityCluster ac;
+ private final PriorityQueue<RankedRunnableTaskCluster> runnableQueue;
+
private final Set<TaskCluster> inProgressTaskClusters;
public DefaultActivityClusterStateMachine(ClusterControllerService ccs, DefaultJobRunStateMachine jsm,
@@ -64,6 +70,14 @@
this.ccs = ccs;
this.jsm = jsm;
this.ac = ac;
+ runnableQueue = new PriorityQueue<RankedRunnableTaskCluster>(ac.getTaskClusters().length,
+ new Comparator<RankedRunnableTaskCluster>() {
+ @Override
+ public int compare(RankedRunnableTaskCluster o1, RankedRunnableTaskCluster o2) {
+ int cmp = o1.getRank() - o2.getRank();
+ return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
+ }
+ });
inProgressTaskClusters = new HashSet<TaskCluster>();
}
@@ -177,12 +191,12 @@
}
private void startRunnableTaskClusters() throws HyracksException {
- Set<TaskCluster> runnableTaskClusters = new HashSet<TaskCluster>();
- findRunnableTaskClusters(runnableTaskClusters);
+ findRunnableTaskClusters();
Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
- for (TaskCluster tc : runnableTaskClusters) {
+ for (RankedRunnableTaskCluster rrtc : runnableQueue) {
+ TaskCluster tc = rrtc.getTaskCluster();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
+ LOGGER.info("Found runnable TC: " + tc);
List<TaskClusterAttempt> attempts = tc.getAttempts();
LOGGER.info("Attempts so far:" + attempts.size());
for (TaskClusterAttempt tcAttempt : attempts) {
@@ -202,60 +216,74 @@
startTasks(taskAttemptMap);
}
- private void findRunnableTaskClusters(Set<TaskCluster> runnableTaskClusters) {
+ private void findRunnableTaskClusters() {
TaskCluster[] taskClusters = ac.getTaskClusters();
+ Map<TaskCluster, Integer> runnabilityRanks = new HashMap<TaskCluster, Integer>();
for (TaskCluster tc : taskClusters) {
- Set<TaskCluster> blockers = tc.getBlockers();
- TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
- if (lastAttempt != null
- && (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastAttempt
- .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
- continue;
+ // Start search at TCs that produce no outputs (sinks)
+ if (tc.getProducedPartitions().isEmpty()) {
+ assignRunnabilityRank(tc, runnabilityRanks);
}
- boolean runnable = true;
- for (TaskCluster blockerTC : blockers) {
- List<TaskClusterAttempt> tcAttempts = blockerTC.getAttempts();
- if (tcAttempts.isEmpty()) {
- runnable = false;
- break;
- }
- TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
- if (tcAttempt.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
- runnable = false;
- break;
- }
+ }
+
+ runnableQueue.clear();
+ for (Map.Entry<TaskCluster, Integer> e : runnabilityRanks.entrySet()) {
+ TaskCluster tc = e.getKey();
+ int rank = e.getValue();
+ if (rank >= 0 && rank < Integer.MAX_VALUE) {
+ runnableQueue.add(new RankedRunnableTaskCluster(rank, tc));
}
- if (runnable) {
- runnableTaskClusters.add(tc);
- }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ranked TCs: " + runnableQueue);
}
}
- private void findCascadingAbortTaskClusterAttempts(TaskClusterAttempt abortedTCAttempt,
- Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts) {
- boolean changed = true;
- cascadingAbortTaskClusterAttempts.add(abortedTCAttempt);
- while (changed) {
- changed = false;
- for (TaskCluster tc : ac.getTaskClusters()) {
- TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
- if (tca != null && tca.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
- boolean abort = false;
- for (TaskClusterAttempt catca : cascadingAbortTaskClusterAttempts) {
- TaskCluster catc = catca.getTaskCluster();
- if (tc.getDependencies().contains(catc)) {
- abort = true;
- break;
- }
- }
- if (abort) {
- changed = cascadingAbortTaskClusterAttempts.add(tca) || changed;
- }
- }
+ /*
+ * Runnability rank has the following semantics
+ * Rank(Completed TaskCluster) = -2
+ * Rank(Running TaskCluster) = -1
+ * Rank(Runnable TaskCluster depending on completed TaskClusters) = 0
+ * Rank(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
+ * Rank(Non-schedulable TaskCluster) = MAX_VALUE
+ */
+ private int assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Integer> runnabilityRank) {
+ if (runnabilityRank.containsKey(goal)) {
+ return runnabilityRank.get(goal);
+ }
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal);
+ if (lastAttempt != null) {
+ if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
+ || lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+ int rank = -1;
+ runnabilityRank.put(goal, rank);
+ return rank;
}
}
- cascadingAbortTaskClusterAttempts.remove(abortedTCAttempt);
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
+ JobRun jobRun = ac.getJobRun();
+ PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+ int maxInputRank = -1;
+ for (PartitionId pid : goal.getRequiredPartitions()) {
+ int rank = -1;
+ ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+ IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+ PartitionState maxState = pmm.getMaximumAvailableState(pid);
+ if (PartitionState.COMMITTED.equals(maxState)
+ || (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish())) {
+ rank = -1;
+ } else {
+ rank = assignRunnabilityRank(ac.getPartitionProducingTaskClusterMap().get(pid), runnabilityRank);
+ if (rank >= 0 && cPolicy.consumerWaitsForProducerToFinish()) {
+ rank = Integer.MAX_VALUE;
+ }
+ }
+ maxInputRank = Math.max(maxInputRank, rank);
+ }
+ int rank = maxInputRank < Integer.MAX_VALUE ? maxInputRank + 1 : Integer.MAX_VALUE;
+ runnabilityRank.put(goal, rank);
+ return rank;
}
private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
@@ -270,6 +298,7 @@
final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
final NodeControllerState node = ccs.getNodeMap().get(nodeId);
if (node != null) {
+ node.getActiveJobIds().add(jobRun.getJobId());
jobRun.getParticipatingNodeIds().add(nodeId);
executor.execute(new Runnable() {
@Override
@@ -289,16 +318,19 @@
}
private void abortTaskCluster(TaskClusterAttempt tcAttempt) throws HyracksException {
+ Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
- for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
- if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
- ta2.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
- List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta2.getNodeId());
+ for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
+ TaskAttemptId taId = ta.getTaskAttemptId();
+ abortTaskIds.add(taId);
+ if (ta.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
+ ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+ List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
if (abortTaskAttempts == null) {
abortTaskAttempts = new ArrayList<TaskAttemptId>();
- abortTaskAttemptMap.put(ta2.getNodeId(), abortTaskAttempts);
+ abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
}
- abortTaskAttempts.add(ta2.getTaskAttemptId());
+ abortTaskAttempts.add(taId);
}
}
JobRun jobRun = ac.getJobRun();
@@ -320,6 +352,10 @@
});
}
}
+ TaskCluster tc = tcAttempt.getTaskCluster();
+ PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+ pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
+ pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
}
@Override
@@ -333,21 +369,70 @@
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
abortTaskCluster(lastAttempt);
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
- Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts = new HashSet<TaskClusterAttempt>();
- findCascadingAbortTaskClusterAttempts(lastAttempt, cascadingAbortTaskClusterAttempts);
- for (TaskClusterAttempt tca : cascadingAbortTaskClusterAttempts) {
- abortTaskCluster(tca);
- tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
- }
+ abortDoomedTaskClusters();
ac.notifyTaskClusterFailure(lastAttempt, exception);
} else {
- LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
+ LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
}
} else {
- LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
+ LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt);
}
}
+ private void abortDoomedTaskClusters() throws HyracksException {
+ TaskCluster[] taskClusters = ac.getTaskClusters();
+
+ Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>();
+ for (TaskCluster tc : taskClusters) {
+ // Start search at TCs that produce no outputs (sinks)
+ if (tc.getProducedPartitions().isEmpty()) {
+ findDoomedTaskClusters(tc, doomedTaskClusters);
+ }
+ }
+
+ for (TaskCluster tc : doomedTaskClusters) {
+ TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+ abortTaskCluster(tca);
+ tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ }
+ }
+
+ private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) {
+ if (doomedTaskClusters.contains(tc)) {
+ return true;
+ }
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+ if (lastAttempt != null) {
+ switch (lastAttempt.getStatus()) {
+ case ABORTED:
+ case FAILED:
+ return true;
+
+ case COMPLETED:
+ return false;
+ }
+ }
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = ac.getConnectorPolicyMap();
+ JobRun jobRun = ac.getJobRun();
+ PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
+ boolean doomed = false;
+ for (PartitionId pid : tc.getRequiredPartitions()) {
+ ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
+ IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
+ PartitionState maxState = pmm.getMaximumAvailableState(pid);
+ if (maxState == null
+ || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) {
+ if (findDoomedTaskClusters(ac.getPartitionProducingTaskClusterMap().get(pid), doomedTaskClusters)) {
+ doomed = true;
+ }
+ }
+ }
+ if (doomed) {
+ doomedTaskClusters.add(tc);
+ }
+ return doomed;
+ }
+
@Override
public void abort() throws HyracksException {
TaskCluster[] taskClusters = ac.getTaskClusters();
@@ -375,4 +460,28 @@
assignTaskLocations(tc, taskAttemptMap);
startTasks(taskAttemptMap);
}
+
+ @Override
+ public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+ for (TaskCluster tc : ac.getTaskClusters()) {
+ TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+ if (lastTaskClusterAttempt != null
+ && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+ .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+ boolean abort = false;
+ for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts()) {
+ if (deadNodes.contains(ta.getNodeId())) {
+ ta.setStatus(TaskAttempt.TaskStatus.FAILED, new HyracksException("Node " + ta.getNodeId()
+ + " failed"));
+ abort = true;
+ }
+ }
+ if (abort) {
+ abortTaskCluster(lastTaskClusterAttempt);
+ }
+ }
+ }
+ abortDoomedTaskClusters();
+ startRunnableTaskClusters();
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 12215ff..16b7f5c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -205,7 +205,7 @@
}
}
- private void findRunnableClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
+ private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
if (completedClusters.contains(candidate) || frontier.contains(candidate)
|| inProgressClusters.contains(candidate)) {
return;
@@ -214,7 +214,7 @@
for (ActivityCluster s : candidate.getDependencies()) {
if (!completedClusters.contains(s)) {
runnable = false;
- findRunnableClusters(frontier, s);
+ findRunnableActivityClusters(frontier, s);
}
}
if (runnable && candidate != rootActivityCluster) {
@@ -222,8 +222,8 @@
}
}
- private void findRunnableClusters(Set<ActivityCluster> frontier) {
- findRunnableClusters(frontier, rootActivityCluster);
+ private void findRunnableActivityClusters(Set<ActivityCluster> frontier) {
+ findRunnableActivityClusters(frontier, rootActivityCluster);
}
@Override
@@ -266,7 +266,7 @@
private void startRunnableActivityClusters() throws HyracksException {
Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
- findRunnableClusters(runnableClusters);
+ findRunnableActivityClusters(runnableClusters);
if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
return;
@@ -448,20 +448,12 @@
tc.getProducedPartitions().add(pid);
targetTC.getRequiredPartitions().add(pid);
partitionProducingTaskClusterMap.put(pid, tc);
- IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
- targetTC.getDependencies().add(tc);
- if (cPolicy.consumerWaitsForProducerToFinish()) {
- targetTC.getBlockers().add(tc);
- }
}
}
}
}
}
- computeBlockerClosure(tcSet);
- computeDependencyClosure(tcSet);
-
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Plan for " + ac);
LOGGER.info("Built " + tcSet.size() + " Task Clusters");
@@ -511,48 +503,6 @@
return new PipelinedConnectorPolicy();
}
- private void computeDependencyClosure(Set<TaskCluster> tcSet) {
- boolean done = false;
- while (!done) {
- done = true;
- for (TaskCluster tc : tcSet) {
- Set<TaskCluster> deps = tc.getDependencies();
- if (!deps.isEmpty()) {
- Set<TaskCluster> copy = new HashSet<TaskCluster>(deps);
- for (TaskCluster tc2 : copy) {
- for (TaskCluster tc3 : tc2.getDependencies()) {
- if (!deps.contains(tc3)) {
- deps.add(tc3);
- done = false;
- }
- }
- }
- }
- }
- }
- }
-
- private void computeBlockerClosure(Set<TaskCluster> tcSet) {
- boolean done = false;
- while (!done) {
- done = true;
- for (TaskCluster tc : tcSet) {
- Set<TaskCluster> blockers = tc.getBlockers();
- if (!blockers.isEmpty()) {
- Set<TaskCluster> copy = new HashSet<TaskCluster>(blockers);
- for (TaskCluster tc2 : copy) {
- for (TaskCluster tc3 : tc2.getBlockers()) {
- if (!blockers.contains(tc3)) {
- blockers.add(tc3);
- done = false;
- }
- }
- }
- }
- }
- }
- }
-
@Override
public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
for (ActivityCluster ac2 : inProgressClusters) {
@@ -571,4 +521,14 @@
inProgressClusters.remove(ac);
startRunnableActivityClusters();
}
+
+ @Override
+ public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+ jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+ if (!inProgressClusters.isEmpty()) {
+ for (ActivityCluster ac : inProgressClusters) {
+ ac.notifyNodeFailures(deadNodes);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
index 02adfa5..c6d85dd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.cc.scheduler;
+import java.util.Set;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
@@ -28,4 +30,6 @@
public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException;
public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException;
+
+ public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
index 97ca9fe..bbf23d5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.cc.scheduler;
+import java.util.Set;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
@@ -23,4 +25,6 @@
public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException;
public void notifyActivityClusterComplete(ActivityCluster activityCluster) throws HyracksException;
+
+ public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
new file mode 100644
index 0000000..fa11643
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+
+public class RankedRunnableTaskCluster {
+ private final int rank;
+
+ private final TaskCluster taskCluster;
+
+ public RankedRunnableTaskCluster(int rank, TaskCluster taskCluster) {
+ this.rank = rank;
+ this.taskCluster = taskCluster;
+ }
+
+ public int getRank() {
+ return rank;
+ }
+
+ public TaskCluster getTaskCluster() {
+ return taskCluster;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + rank + ":" + taskCluster + "]";
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 8dbb180..8dae67d 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -19,10 +19,10 @@
import java.util.UUID;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -40,7 +40,7 @@
public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
- public void registerPartitionProvider(PartitionId pid, String nodeId, PartitionState state) throws Exception;
+ public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
- public void registerPartitionRequest(PartitionId pid, String nodeId, PartitionState minState) throws Exception;
+ public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
new file mode 100644
index 0000000..63b7ad3
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionDescriptor implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+
+ private final String nodeId;
+
+ private final TaskAttemptId producingTaskAttemptId;
+
+ private final boolean reusable;
+
+ private PartitionState state;
+
+ public PartitionDescriptor(PartitionId pid, String nodeId, TaskAttemptId producingTaskAttemptId, boolean reusable) {
+ this.pid = pid;
+ this.nodeId = nodeId;
+ this.producingTaskAttemptId = producingTaskAttemptId;
+ this.reusable = reusable;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public TaskAttemptId getProducingTaskAttemptId() {
+ return producingTaskAttemptId;
+ }
+
+ public PartitionState getState() {
+ return state;
+ }
+
+ public void setState(PartitionState state) {
+ this.state = state;
+ }
+
+ public boolean isReusable() {
+ return reusable;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable")
+ + state + "]";
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
new file mode 100644
index 0000000..ca34501
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionRequest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+
+ private final String requestingNodeId;
+
+ private final TaskAttemptId requestingTaskAttemptId;
+
+ private final PartitionState minState;
+
+ public PartitionRequest(PartitionId pid, String requestingNodeId, TaskAttemptId requestingTaskAttemptId,
+ PartitionState minState) {
+ this.pid = pid;
+ this.requestingNodeId = requestingNodeId;
+ this.requestingTaskAttemptId = requestingTaskAttemptId;
+ this.minState = minState;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public String getNodeId() {
+ return requestingNodeId;
+ }
+
+ public TaskAttemptId getRequestingTaskAttemptId() {
+ return requestingTaskAttemptId;
+ }
+
+ public PartitionState getMinimumState() {
+ return minState;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + pid + ":" + requestingNodeId + ":" + requestingTaskAttemptId + ":" + minState + "]";
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
index 412cbac..edde0b8 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.control.common.job;
public enum PartitionState {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 9c12cd9..8ef0f3e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
@@ -213,11 +214,12 @@
return counter;
}
- public synchronized void advertisePartitionRequest(Collection<PartitionId> pids, IPartitionCollector collector,
- PartitionState minState) throws Exception {
+ public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
+ IPartitionCollector collector, PartitionState minState) throws Exception {
for (PartitionId pid : pids) {
partitionRequestMap.put(pid, collector);
- nodeController.getClusterController().registerPartitionRequest(pid, nodeController.getId(), minState);
+ PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
+ nodeController.getClusterController().registerPartitionRequest(req);
}
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a26988f..3223907 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -286,7 +286,7 @@
IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
- partition);
+ partition, taId);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " + conn.getConnectorId());
@@ -314,20 +314,21 @@
IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
td.getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
- return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, executor);
+ return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, task.getTaskAttemptId(),
+ executor);
} else {
return collector;
}
}
private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final UUID jobId,
- final IConnectorDescriptor conn, final int senderIndex) {
+ final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
if (cPolicy.materializeOnSendSide()) {
return new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializedPartitionWriter(ctx, partitionManager, new PartitionId(jobId,
- conn.getConnectorId(), senderIndex, receiverIndex), executor);
+ conn.getConnectorId(), senderIndex, receiverIndex), taId, executor);
}
};
} else {
@@ -335,7 +336,7 @@
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
- senderIndex, receiverIndex));
+ senderIndex, receiverIndex), taId);
}
};
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 20bf9da..e787a97 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -207,7 +207,7 @@
try {
collector.open();
try {
- joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
+ joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
IFrameReader reader = collector.getReader();
reader.open();
try {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 353ba32..db2e405 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -19,11 +19,13 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileHandle;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
public class MaterializedPartitionWriter implements IFrameWriter {
@@ -33,6 +35,8 @@
protected final PartitionId pid;
+ protected final TaskAttemptId taId;
+
protected final Executor executor;
private FileReference fRef;
@@ -42,10 +46,11 @@
private long size;
public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
- Executor executor) {
+ TaskAttemptId taId, Executor executor) {
this.ctx = ctx;
this.manager = manager;
this.pid = pid;
+ this.taId = taId;
this.executor = executor;
}
@@ -69,7 +74,8 @@
@Override
public void close() throws HyracksDataException {
ctx.getIOManager().close(handle);
- manager.registerPartition(pid, new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()));
- manager.notifyPartitionCommit(pid);
+ manager.registerPartition(pid, taId,
+ new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
+ PartitionState.COMMITTED);
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index b676e33..63ae790 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -22,11 +22,13 @@
import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -49,7 +51,8 @@
fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
}
- public void registerPartition(PartitionId pid, IPartition partition) throws HyracksDataException {
+ public void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ throws HyracksDataException {
synchronized (this) {
List<IPartition> pList = partitionMap.get(pid);
if (pList == null) {
@@ -58,16 +61,15 @@
}
pList.add(partition);
}
- try {
- ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.STARTED);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ updatePartitionState(pid, taId, partition, state);
}
- public void notifyPartitionCommit(PartitionId pid) throws HyracksDataException {
+ public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ throws HyracksDataException {
+ PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
+ desc.setState(state);
try {
- ncs.getClusterController().registerPartitionProvider(pid, ncs.getId(), PartitionState.COMMITTED);
+ ncs.getClusterController().registerPartitionProvider(desc);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index ec6e25d..1a9fb53 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -17,20 +17,25 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
public class PipelinedPartition implements IFrameWriter, IPartition {
private final PartitionManager manager;
private final PartitionId pid;
+ private final TaskAttemptId taId;
+
private IFrameWriter delegate;
- public PipelinedPartition(PartitionManager manager, PartitionId pid) {
+ public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
this.manager = manager;
this.pid = pid;
+ this.taId = taId;
}
@Override
@@ -51,7 +56,7 @@
@Override
public synchronized void open() throws HyracksDataException {
- manager.registerPartition(pid, this);
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED);
while (delegate == null) {
try {
wait();
@@ -74,7 +79,7 @@
@Override
public void close() throws HyracksDataException {
- manager.notifyPartitionCommit(pid);
+ manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
delegate.close();
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index 5f3a3f4..6b7eb4e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -37,13 +38,16 @@
private final IPartitionCollector delegate;
+ private final TaskAttemptId taId;
+
private final Executor executor;
public ReceiveSideMaterializingCollector(IHyracksRootContext ctx, PartitionManager manager,
- IPartitionCollector collector, Executor executor) {
+ IPartitionCollector collector, TaskAttemptId taId, Executor executor) {
this.ctx = ctx;
this.manager = manager;
this.delegate = collector;
+ this.taId = taId;
this.executor = executor;
}
@@ -103,7 +107,7 @@
@Override
public synchronized void run() {
PartitionId pid = pc.getPartitionId();
- MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, executor);
+ MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, taId, executor);
IInputChannel channel = pc.getInputChannel();
try {
channel.registerMonitor(this);