changes to tracked locations assigned to an operator for each partition
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
index 812a32a..4b7cbe5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
@@ -6,67 +6,71 @@
 
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 
-public class JobInfo implements Serializable{
+public class JobInfo implements Serializable {
 
-    private final JobId jobId;
+	private static final long serialVersionUID = 1L;
 
-    private JobStatus status;
+	private final JobId jobId;
 
-    private List<Exception> exceptions;
+	private JobStatus status;
 
-    private JobStatus pendingStatus;
+	private List<Exception> exceptions;
 
-    private List<Exception> pendingExceptions;
+	private JobStatus pendingStatus;
 
-    private Map<OperatorDescriptorId, List<String>> operatorLocations;
+	private List<Exception> pendingExceptions;
 
-    public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, List<String>> operatorLocations) {
-        this.jobId = jobId;
-        this.operatorLocations = operatorLocations;
-        this.status = status;
-    }
+	private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
-    public JobStatus getStatus() {
-        return status;
-    }
+	public JobInfo(JobId jobId, JobStatus jobStatus,
+			Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations) {
+		this.jobId = jobId;
+		this.operatorLocations = operatorLocations;
+		this.status = jobStatus;
+	}
 
-    public void setStatus(JobStatus status) {
-        this.status = status;
-    }
+	public JobStatus getStatus() {
+		return status;
+	}
 
-    public List<Exception> getExceptions() {
-        return exceptions;
-    }
+	public void setStatus(JobStatus status) {
+		this.status = status;
+	}
 
-    public void setExceptions(List<Exception> exceptions) {
-        this.exceptions = exceptions;
-    }
+	public List<Exception> getExceptions() {
+		return exceptions;
+	}
 
-    public JobStatus getPendingStatus() {
-        return pendingStatus;
-    }
+	public void setExceptions(List<Exception> exceptions) {
+		this.exceptions = exceptions;
+	}
 
-    public void setPendingStatus(JobStatus pendingStatus) {
-        this.pendingStatus = pendingStatus;
-    }
+	public JobStatus getPendingStatus() {
+		return pendingStatus;
+	}
 
-    public List<Exception> getPendingExceptions() {
-        return pendingExceptions;
-    }
+	public void setPendingStatus(JobStatus pendingStatus) {
+		this.pendingStatus = pendingStatus;
+	}
 
-    public void setPendingExceptions(List<Exception> pendingExceptions) {
-        this.pendingExceptions = pendingExceptions;
-    }
+	public List<Exception> getPendingExceptions() {
+		return pendingExceptions;
+	}
 
-    public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
-        return operatorLocations;
-    }
+	public void setPendingExceptions(List<Exception> pendingExceptions) {
+		this.pendingExceptions = pendingExceptions;
+	}
 
-    public void setOperatorLocations(Map<OperatorDescriptorId, List<String>> operatorLocations) {
-        this.operatorLocations = operatorLocations;
-    }
+	public Map<OperatorDescriptorId, Map<Integer, String>> getOperatorLocations() {
+		return operatorLocations;
+	}
 
-    public JobId getJobId() {
-        return jobId;
-    }
+	public void setOperatorLocations(
+			Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations) {
+		this.operatorLocations = operatorLocations;
+	}
+
+	public JobId getJobId() {
+		return jobId;
+	}
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 1f68210..8e47735 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -89,7 +89,7 @@
 
     private List<Exception> pendingExceptions;
 
-    private Map<OperatorDescriptorId, List<String>> operatorLocations;
+    private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
             IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
@@ -105,7 +105,7 @@
         cleanupPendingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-        operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
+        operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
     }
 
     public DeploymentId getDeploymentId() {
@@ -183,13 +183,13 @@
         this.endTime = endTime;
     }
 
-    public void registerOperatorLocation(OperatorDescriptorId op, String location) {
-        List<String> locations = operatorLocations.get(op);
+    public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
+        Map<Integer, String> locations = operatorLocations.get(op);
         if (locations == null) {
-            locations = new ArrayList<String>();
+            locations = new HashMap<Integer, String>();
             operatorLocations.put(op, locations);
         }
-        locations.add(location);
+        locations.put(partition, location);
     }
 
     @Override
@@ -362,8 +362,7 @@
                                 taskAttempt.put("end-time", ta.getEndTime());
                                 List<Exception> exceptions = ta.getExceptions();
                                 if (exceptions != null && !exceptions.isEmpty()) {
-                                    List<Exception> filteredExceptions = ExceptionUtils
-                                            .getActualExceptions(exceptions);
+                                    List<Exception> filteredExceptions = ExceptionUtils.getActualExceptions(exceptions);
                                     for (Exception exception : filteredExceptions) {
                                         StringWriter exceptionWriter = new StringWriter();
                                         exception.printStackTrace(new PrintWriter(exceptionWriter));
@@ -394,7 +393,7 @@
         return result;
     }
 
-    public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+    public Map<OperatorDescriptorId, Map<Integer, String>> getOperatorLocations() {
         return operatorLocations;
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index d2c018f..ad4744b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -329,7 +329,7 @@
                 taskAttemptMap.put(nodeId, tads);
             }
             OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
-            jobRun.registerOperatorLocation(opId, nodeId);
+            jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId);
             ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
             TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
                     apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());