changes to tracked locations assigned to an operator for each partition
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
index 812a32a..4b7cbe5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
@@ -6,67 +6,71 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-public class JobInfo implements Serializable{
+public class JobInfo implements Serializable {
- private final JobId jobId;
+ private static final long serialVersionUID = 1L;
- private JobStatus status;
+ private final JobId jobId;
- private List<Exception> exceptions;
+ private JobStatus status;
- private JobStatus pendingStatus;
+ private List<Exception> exceptions;
- private List<Exception> pendingExceptions;
+ private JobStatus pendingStatus;
- private Map<OperatorDescriptorId, List<String>> operatorLocations;
+ private List<Exception> pendingExceptions;
- public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, List<String>> operatorLocations) {
- this.jobId = jobId;
- this.operatorLocations = operatorLocations;
- this.status = status;
- }
+ private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
- public JobStatus getStatus() {
- return status;
- }
+ public JobInfo(JobId jobId, JobStatus jobStatus,
+ Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations) {
+ this.jobId = jobId;
+ this.operatorLocations = operatorLocations;
+ this.status = jobStatus;
+ }
- public void setStatus(JobStatus status) {
- this.status = status;
- }
+ public JobStatus getStatus() {
+ return status;
+ }
- public List<Exception> getExceptions() {
- return exceptions;
- }
+ public void setStatus(JobStatus status) {
+ this.status = status;
+ }
- public void setExceptions(List<Exception> exceptions) {
- this.exceptions = exceptions;
- }
+ public List<Exception> getExceptions() {
+ return exceptions;
+ }
- public JobStatus getPendingStatus() {
- return pendingStatus;
- }
+ public void setExceptions(List<Exception> exceptions) {
+ this.exceptions = exceptions;
+ }
- public void setPendingStatus(JobStatus pendingStatus) {
- this.pendingStatus = pendingStatus;
- }
+ public JobStatus getPendingStatus() {
+ return pendingStatus;
+ }
- public List<Exception> getPendingExceptions() {
- return pendingExceptions;
- }
+ public void setPendingStatus(JobStatus pendingStatus) {
+ this.pendingStatus = pendingStatus;
+ }
- public void setPendingExceptions(List<Exception> pendingExceptions) {
- this.pendingExceptions = pendingExceptions;
- }
+ public List<Exception> getPendingExceptions() {
+ return pendingExceptions;
+ }
- public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
- return operatorLocations;
- }
+ public void setPendingExceptions(List<Exception> pendingExceptions) {
+ this.pendingExceptions = pendingExceptions;
+ }
- public void setOperatorLocations(Map<OperatorDescriptorId, List<String>> operatorLocations) {
- this.operatorLocations = operatorLocations;
- }
+ public Map<OperatorDescriptorId, Map<Integer, String>> getOperatorLocations() {
+ return operatorLocations;
+ }
- public JobId getJobId() {
- return jobId;
- }
+ public void setOperatorLocations(
+ Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations) {
+ this.operatorLocations = operatorLocations;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 1f68210..8e47735 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -89,7 +89,7 @@
private List<Exception> pendingExceptions;
- private Map<OperatorDescriptorId, List<String>> operatorLocations;
+ private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
@@ -105,7 +105,7 @@
cleanupPendingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
+ operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
}
public DeploymentId getDeploymentId() {
@@ -183,13 +183,13 @@
this.endTime = endTime;
}
- public void registerOperatorLocation(OperatorDescriptorId op, String location) {
- List<String> locations = operatorLocations.get(op);
+ public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
+ Map<Integer, String> locations = operatorLocations.get(op);
if (locations == null) {
- locations = new ArrayList<String>();
+ locations = new HashMap<Integer, String>();
operatorLocations.put(op, locations);
}
- locations.add(location);
+ locations.put(partition, location);
}
@Override
@@ -362,8 +362,7 @@
taskAttempt.put("end-time", ta.getEndTime());
List<Exception> exceptions = ta.getExceptions();
if (exceptions != null && !exceptions.isEmpty()) {
- List<Exception> filteredExceptions = ExceptionUtils
- .getActualExceptions(exceptions);
+ List<Exception> filteredExceptions = ExceptionUtils.getActualExceptions(exceptions);
for (Exception exception : filteredExceptions) {
StringWriter exceptionWriter = new StringWriter();
exception.printStackTrace(new PrintWriter(exceptionWriter));
@@ -394,7 +393,7 @@
return result;
}
- public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+ public Map<OperatorDescriptorId, Map<Integer, String>> getOperatorLocations() {
return operatorLocations;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index d2c018f..ad4744b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -329,7 +329,7 @@
taskAttemptMap.put(nodeId, tads);
}
OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
- jobRun.registerOperatorLocation(opId, nodeId);
+ jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId);
ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());