Fixed REST Api

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@541 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index f369f95..930d299 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -22,6 +22,10 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IActivity;
@@ -201,4 +205,59 @@
         buffer.append('\n');
         return buffer.toString();
     }
+
+    public JSONObject toJSON() throws JSONException {
+        JSONObject jplan = new JSONObject();
+
+        jplan.put("type", "plan");
+        jplan.put("flags", jobFlags.toString());
+
+        JSONArray jans = new JSONArray();
+        for (IActivity an : activityNodes.values()) {
+            JSONObject jan = new JSONObject();
+            jan.put("type", "activity");
+            jan.put("id", an.getActivityId().toString());
+            jan.put("java-class", an.getClass().getName());
+            jan.put("operator-id", an.getActivityId().getOperatorDescriptorId().toString());
+
+            List<IConnectorDescriptor> inputs = getActivityInputConnectorDescriptors(an.getActivityId());
+            if (inputs != null) {
+                JSONArray jInputs = new JSONArray();
+                for (int i = 0; i < inputs.size(); ++i) {
+                    JSONObject jInput = new JSONObject();
+                    jInput.put("type", "activity-input");
+                    jInput.put("input-port", i);
+                    jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
+                    jInputs.put(jInput);
+                }
+                jan.put("inputs", jInputs);
+            }
+
+            List<IConnectorDescriptor> outputs = getActivityOutputConnectorDescriptors(an.getActivityId());
+            if (outputs != null) {
+                JSONArray jOutputs = new JSONArray();
+                for (int i = 0; i < outputs.size(); ++i) {
+                    JSONObject jOutput = new JSONObject();
+                    jOutput.put("type", "activity-output");
+                    jOutput.put("output-port", i);
+                    jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
+                    jOutputs.put(jOutput);
+                }
+                jan.put("outputs", jOutputs);
+            }
+
+            Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
+            if (blockers != null) {
+                JSONArray jDeps = new JSONArray();
+                for (ActivityId blocker : blockers) {
+                    jDeps.put(blocker.toString());
+                }
+                jan.put("depends-on", jDeps);
+            }
+            jans.put(jan);
+        }
+        jplan.put("activities", jans);
+
+        return jplan;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index ae95ca8..14cebb2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -28,6 +28,8 @@
 
     private final Set<ActivityCluster> dependents;
 
+    private ActivityClusterId id;
+
     private ActivityClusterPlan acp;
 
     public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
@@ -61,6 +63,14 @@
         return jobRun;
     }
 
+    public ActivityClusterId getActivityClusterId() {
+        return id;
+    }
+
+    public void setActivityClusterId(ActivityClusterId id) {
+        this.id = id;
+    }
+
     public int getMaxTaskClusterAttempts() {
         return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
new file mode 100644
index 0000000..46628a4
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.io.Serializable;
+
+public final class ActivityClusterId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final int id;
+
+    public ActivityClusterId(int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + id;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ActivityClusterId other = (ActivityClusterId) obj;
+        if (id != other.id)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "AC:" + id;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
index 531dc92..2b1d3e2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
@@ -19,17 +19,17 @@
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 
 public class ActivityClusterPlan {
-    private final Map<ActivityId, Task[]> taskStateMap;
+    private final Map<ActivityId, ActivityPlan> activityPlanMap;
 
     private final TaskCluster[] taskClusters;
 
-    public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap) {
-        this.taskStateMap = taskStateMap;
+    public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, ActivityPlan> activityPlanMap) {
+        this.activityPlanMap = activityPlanMap;
         this.taskClusters = taskClusters;
     }
 
-    public Map<ActivityId, Task[]> getTaskMap() {
-        return taskStateMap;
+    public Map<ActivityId, ActivityPlan> getActivityPlanMap() {
+        return activityPlanMap;
     }
 
     public TaskCluster[] getTaskClusters() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityPlan.java
new file mode 100644
index 0000000..beeca27
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityPlan.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
+
+public class ActivityPlan {
+    private final ActivityPartitionDetails apd;
+
+    private Task[] tasks;
+
+    public ActivityPlan(ActivityPartitionDetails apd) {
+        this.apd = apd;
+    }
+
+    public ActivityPartitionDetails getActivityPartitionDetails() {
+        return apd;
+    }
+
+    public Task[] getTasks() {
+        return tasks;
+    }
+
+    public void setTasks(Task[] tasks) {
+        this.tasks = tasks;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 3bea59a..6231080 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -19,14 +19,21 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
 import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 
@@ -130,4 +137,128 @@
     public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
         return connectorPolicyMap;
     }
+
+    public JSONObject toJSON() throws JSONException {
+        JSONObject result = new JSONObject();
+
+        result.put("job-id", jobId.toString());
+
+        JSONArray aClusters = new JSONArray();
+        for (ActivityCluster ac : activityClusters) {
+            JSONObject acJSON = new JSONObject();
+
+            acJSON.put("activity-cluster-id", String.valueOf(ac.getActivityClusterId()));
+
+            JSONArray activitiesJSON = new JSONArray();
+            for (ActivityId aid : ac.getActivities()) {
+                activitiesJSON.put(aid);
+            }
+            acJSON.put("activities", activitiesJSON);
+
+            JSONArray dependentsJSON = new JSONArray();
+            for (ActivityCluster dependent : ac.getDependents()) {
+                dependentsJSON.put(String.valueOf(dependent.getActivityClusterId()));
+            }
+            acJSON.put("dependents", dependentsJSON);
+
+            JSONArray dependenciesJSON = new JSONArray();
+            for (ActivityCluster dependency : ac.getDependencies()) {
+                dependenciesJSON.put(String.valueOf(dependency.getActivityClusterId()));
+            }
+            acJSON.put("dependencies", dependenciesJSON);
+
+            ActivityClusterPlan acp = ac.getPlan();
+            if (acp == null) {
+                acJSON.put("plan", (Object) null);
+            } else {
+                JSONObject planJSON = new JSONObject();
+
+                JSONArray acTasks = new JSONArray();
+                for (Map.Entry<ActivityId, ActivityPlan> e : acp.getActivityPlanMap().entrySet()) {
+                    ActivityPlan acPlan = e.getValue();
+                    JSONObject entry = new JSONObject();
+                    entry.put("activity-id", e.getKey().toString());
+
+                    ActivityPartitionDetails apd = acPlan.getActivityPartitionDetails();
+                    entry.put("partition-count", apd.getPartitionCount());
+
+                    JSONArray inPartCountsJSON = new JSONArray();
+                    int[] inPartCounts = apd.getInputPartitionCounts();
+                    if (inPartCounts != null) {
+                        for (int i : inPartCounts) {
+                            inPartCountsJSON.put(i);
+                        }
+                    }
+                    entry.put("input-partition-counts", inPartCountsJSON);
+
+                    JSONArray outPartCountsJSON = new JSONArray();
+                    int[] outPartCounts = apd.getOutputPartitionCounts();
+                    if (outPartCounts != null) {
+                        for (int o : outPartCounts) {
+                            outPartCountsJSON.put(o);
+                        }
+                    }
+                    entry.put("output-partition-counts", outPartCountsJSON);
+
+                    JSONArray tasks = new JSONArray();
+                    for (Task t : acPlan.getTasks()) {
+                        JSONObject task = new JSONObject();
+
+                        task.put("task-id", t.getTaskId().toString());
+
+                        JSONArray dependentTasksJSON = new JSONArray();
+                        for (TaskId dependent : t.getDependents()) {
+                            dependentTasksJSON.put(dependent.toString());
+                        }
+                        task.put("dependents", dependentTasksJSON);
+
+                        JSONArray dependencyTasksJSON = new JSONArray();
+                        for (TaskId dependency : t.getDependencies()) {
+                            dependencyTasksJSON.put(dependency.toString());
+                        }
+                        task.put("dependencies", dependencyTasksJSON);
+
+                        tasks.put(task);
+                    }
+                    entry.put("tasks", tasks);
+
+                    acTasks.put(entry);
+                }
+                planJSON.put("task-map", acTasks);
+
+                JSONArray tClusters = new JSONArray();
+                for (TaskCluster tc : acp.getTaskClusters()) {
+                    JSONObject c = new JSONObject();
+                    c.put("task-cluster-id", String.valueOf(tc.getTaskClusterId()));
+
+                    JSONArray tasks = new JSONArray();
+                    for (Task t : tc.getTasks()) {
+                        tasks.put(t.getTaskId().toString());
+                    }
+                    c.put("tasks", tasks);
+
+                    JSONArray prodParts = new JSONArray();
+                    for (PartitionId p : tc.getProducedPartitions()) {
+                        prodParts.put(p.toString());
+                    }
+                    c.put("produced-partitions", prodParts);
+
+                    JSONArray reqdParts = new JSONArray();
+                    for (PartitionId p : tc.getRequiredPartitions()) {
+                        reqdParts.put(p.toString());
+                    }
+                    c.put("required-partitions", reqdParts);
+
+                    tClusters.put(c);
+                }
+                planJSON.put("task-clusters", tClusters);
+
+                acJSON.put("plan", planJSON);
+            }
+            aClusters.put(acJSON);
+        }
+        result.put("activity-clusters", aClusters);
+
+        return result;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
index 28deb44..84ae6e5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
@@ -18,12 +18,11 @@
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
 
 public class Task {
     private final TaskId taskId;
 
-    private final ActivityPartitionDetails apd;
+    private final ActivityPlan activityPlan;
 
     private final Set<TaskId> dependencies;
 
@@ -31,9 +30,9 @@
 
     private TaskCluster taskCluster;
 
-    public Task(TaskId taskId, ActivityPartitionDetails apd) {
+    public Task(TaskId taskId, ActivityPlan activityPlan) {
         this.taskId = taskId;
-        this.apd = apd;
+        this.activityPlan = activityPlan;
         this.dependencies = new HashSet<TaskId>();
         this.dependents = new HashSet<TaskId>();
     }
@@ -42,8 +41,8 @@
         return taskId;
     }
 
-    public ActivityPartitionDetails getActivityPartitionDetails() {
-        return apd;
+    public ActivityPlan getActivityPlan() {
+        return activityPlan;
     }
 
     public Set<TaskId> getDependencies() {
@@ -64,6 +63,6 @@
 
     @Override
     public String toString() {
-        return taskId + "(" + apd + ")";
+        return String.valueOf(taskId);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index d3de5a1..f41cda9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class TaskCluster {
+    private final TaskClusterId taskClusterId;
+
     private final ActivityCluster ac;
 
     private final Task[] tasks;
@@ -37,7 +39,8 @@
 
     private final List<TaskClusterAttempt> taskClusterAttempts;
 
-    public TaskCluster(ActivityCluster ac, Task[] tasks) {
+    public TaskCluster(TaskClusterId taskClusterId, ActivityCluster ac, Task[] tasks) {
+        this.taskClusterId = taskClusterId;
         this.ac = ac;
         this.tasks = tasks;
         producedPartitions = new HashSet<PartitionId>();
@@ -47,6 +50,10 @@
         taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
     }
 
+    public TaskClusterId getTaskClusterId() {
+        return taskClusterId;
+    }
+
     public ActivityCluster getActivityCluster() {
         return ac;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
new file mode 100644
index 0000000..b1000dd
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.io.Serializable;
+
+public final class TaskClusterId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ActivityClusterId activityClusterId;
+
+    private final int id;
+
+    public TaskClusterId(ActivityClusterId activityClusterId, int id) {
+        this.activityClusterId = activityClusterId;
+        this.id = id;
+    }
+
+    public ActivityClusterId getActivityClusterId() {
+        return activityClusterId;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((activityClusterId == null) ? 0 : activityClusterId.hashCode());
+        result = prime * result + id;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TaskClusterId other = (TaskClusterId) obj;
+        if (activityClusterId == null) {
+            if (other.activityClusterId != null)
+                return false;
+        } else if (!activityClusterId.equals(other.activityClusterId))
+            return false;
+        if (id != other.id)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "TC:" + activityClusterId + ":" + id;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index 6975d4e..0028b6f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
@@ -51,8 +52,8 @@
             Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
             ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
             if (ac != null) {
-                Map<ActivityId, Task[]> taskStateMap = ac.getPlan().getTaskMap();
-                Task[] taskStates = taskStateMap.get(tid.getActivityId());
+                Map<ActivityId, ActivityPlan> taskStateMap = ac.getPlan().getActivityPlanMap();
+                Task[] taskStates = taskStateMap.get(tid.getActivityId()).getTasks();
                 if (taskStates != null && taskStates.length > tid.getPartition()) {
                     Task ts = taskStates[tid.getPartition()];
                     TaskCluster tc = ts.getTaskCluster();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobActivityGraphJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobActivityGraphJSONEvent.java
new file mode 100644
index 0000000..14b56df
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobActivityGraphJSONEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job.manager.events;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
+
+public class GetJobActivityGraphJSONEvent extends SynchronizableEvent {
+    private final ClusterControllerService ccs;
+    private final JobId jobId;
+    private JSONObject json;
+
+    public GetJobActivityGraphJSONEvent(ClusterControllerService ccs, JobId jobId) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        JobRun run = ccs.getRunMap().get(jobId);
+        if (run == null) {
+            json = new JSONObject();
+            return;
+        }
+        json = run.getJobActivityGraph().toJSON();
+    }
+
+    public JSONObject getJSON() {
+        return json;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
index 738d86e..4355962 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
@@ -24,7 +24,7 @@
 public class GetJobProfileJSONEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final JobId jobId;
-    private JSONObject profile;
+    private JSONObject json;
 
     public GetJobProfileJSONEvent(ClusterControllerService ccs, JobId jobId) {
         this.ccs = ccs;
@@ -33,16 +33,16 @@
 
     @Override
     protected void doRun() throws Exception {
-        profile = new JSONObject();
+        json = new JSONObject();
         JobRun jobRun = ccs.getRunMap().get(jobId);
         if (jobRun == null) {
-            profile = new JSONObject();
+            json = new JSONObject();
             return;
         }
-        profile = jobRun.getJobProfile().toJSON();
+        json = jobRun.getJobProfile().toJSON();
     }
 
-    public JSONObject getProfile() {
-        return profile;
+    public JSONObject getJSON() {
+        return json;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
index 884ddf2..db7e5a7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
@@ -24,7 +24,7 @@
 public class GetJobSpecificationJSONEvent extends SynchronizableEvent {
     private final ClusterControllerService ccs;
     private final JobId jobId;
-    private JSONObject spec;
+    private JSONObject json;
 
     public GetJobSpecificationJSONEvent(ClusterControllerService ccs, JobId jobId) {
         this.ccs = ccs;
@@ -35,13 +35,13 @@
     protected void doRun() throws Exception {
         JobRun run = ccs.getRunMap().get(jobId);
         if (run == null) {
-            spec = new JSONObject();
+            json = new JSONObject();
             return;
         }
-        spec = run.getJobActivityGraph().getJobSpecification().toJSON();
+        json = run.getJobActivityGraph().getJobSpecification().toJSON();
     }
 
-    public JSONObject getSpecification() {
-        return spec;
+    public JSONObject getJSON() {
+        return json;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
index 81883d67..0e59c8df 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.util.Pair;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityClusterId;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 
 public class ActivityClusterGraphBuilder {
@@ -123,7 +124,9 @@
             }
         }
         Set<ActivityCluster> roots = new HashSet<ActivityCluster>();
+        int idCounter = 0;
         for (ActivityCluster s : stages) {
+            s.setActivityClusterId(new ActivityClusterId(idCounter++));
             if (s.getDependents().isEmpty()) {
                 roots.add(s);
             }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 57df351..e5d9f64 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -41,9 +41,11 @@
 import edu.uci.ics.hyracks.api.util.Pair;
 import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
 import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
+import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.Task;
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
 
 public class ActivityClusterPlanner {
     private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
@@ -61,7 +63,7 @@
         JobRun jobRun = scheduler.getJobRun();
         Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
 
-        Map<ActivityId, Task[]> taskMap = new HashMap<ActivityId, Task[]>();
+        Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
         Set<ActivityId> activities = ac.getActivities();
 
         Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
@@ -72,15 +74,16 @@
             getDependencyActivityIds(depAnIds, anId);
             ActivityPartitionDetails apd = pcMap.get(anId);
             Task[] tasks = new Task[apd.getPartitionCount()];
+            ActivityPlan activityPlan = new ActivityPlan(apd);
             for (int i = 0; i < tasks.length; ++i) {
                 TaskId tid = new TaskId(anId, i);
-                tasks[i] = new Task(tid, apd);
+                tasks[i] = new Task(tid, activityPlan);
                 for (ActivityId danId : depAnIds) {
                     ActivityCluster dAC = jobRun.getActivityClusterMap().get(danId);
                     ActivityClusterPlan dACP = dAC.getPlan();
                     assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
                             + danId;
-                    Task[] dATasks = dACP.getTaskMap().get(danId);
+                    Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
                     assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
                             + danId;
                     assert dATasks.length == tasks.length : "Dependency activity partitioned differently from dependent: "
@@ -94,24 +97,25 @@
                 cluster.add(tid);
                 taskClusterMap.put(tid, cluster);
             }
-            taskMap.put(anId, tasks);
+            activityPlan.setTasks(tasks);
+            activityPlanMap.put(anId, activityPlan);
         }
 
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, taskMap, pcMap);
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, activityPlanMap);
         scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
 
         Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
         JobActivityGraph jag = jobRun.getJobActivityGraph();
         BitSet targetBitmap = new BitSet();
         for (ActivityId ac1 : activities) {
-            Task[] ac1TaskStates = taskMap.get(ac1);
+            Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
             int nProducers = ac1TaskStates.length;
             List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
                     ActivityId ac2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = taskMap.get(ac2);
+                    Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
                     int nConsumers = ac2TaskStates.length;
                     for (int i = 0; i < nProducers; ++i) {
                         c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -164,15 +168,17 @@
 
         Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
         Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
+        int counter = 0;
         for (Set<TaskId> cluster : clusters) {
             Set<Task> taskStates = new HashSet<Task>();
             for (TaskId tid : cluster) {
-                taskStates.add(taskMap.get(tid.getActivityId())[tid.getPartition()]);
+                taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
             }
-            TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
+            TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), counter++), ac,
+                    taskStates.toArray(new Task[taskStates.size()]));
             tcSet.add(tc);
             for (TaskId tid : cluster) {
-                taskMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
+                activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()].setTaskCluster(tc);
             }
         }
         TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
@@ -184,7 +190,7 @@
                 List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
                 if (cInfoList != null) {
                     for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
-                        Task targetTS = taskMap.get(p.first.getActivityId())[p.first.getPartition()];
+                        Task targetTS = activityPlanMap.get(p.first.getActivityId()).getTasks()[p.first.getPartition()];
                         TaskCluster targetTC = targetTS.getTaskCluster();
                         if (targetTC != tc) {
                             ConnectorDescriptorId cdId = p.second;
@@ -213,13 +219,13 @@
             }
         }
 
-        ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap));
+        ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
     }
 
     private TaskCluster getTaskCluster(TaskId tid) {
         ActivityCluster ac = scheduler.getJobRun().getActivityClusterMap().get(tid.getActivityId());
         ActivityClusterPlan acp = ac.getPlan();
-        Task[] tasks = acp.getTaskMap().get(tid.getActivityId());
+        Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
         Task task = tasks[tid.getPartition()];
         assert task.getTaskId().equals(tid);
         return task.getTaskCluster();
@@ -234,20 +240,20 @@
     }
 
     private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
-            Map<ActivityId, Task[]> taskMap, Map<ActivityId, ActivityPartitionDetails> pcMap) {
+            Map<ActivityId, ActivityPlan> taskMap) {
         JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
         Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
         Set<ActivityId> activities = ac.getActivities();
         BitSet targetBitmap = new BitSet();
         for (ActivityId a1 : activities) {
-            Task[] ac1TaskStates = taskMap.get(a1);
+            Task[] ac1TaskStates = taskMap.get(a1).getTasks();
             int nProducers = ac1TaskStates.length;
             List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(a1);
             if (outputConns != null) {
                 for (IConnectorDescriptor c : outputConns) {
                     ConnectorDescriptorId cdId = c.getConnectorId();
                     ActivityId a2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = taskMap.get(a2);
+                    Task[] ac2TaskStates = taskMap.get(a2).getTasks();
                     int nConsumers = ac2TaskStates.length;
 
                     int[] fanouts = new int[nProducers];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 356c96bb..2fc9624 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -392,7 +392,7 @@
                 tads = new ArrayList<TaskAttemptDescriptor>();
                 taskAttemptMap.put(nodeId, tads);
             }
-            ActivityPartitionDetails apd = ts.getActivityPartitionDetails();
+            ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
             tads.add(new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd.getPartitionCount(), apd
                     .getInputPartitionCounts(), apd.getOutputPartitionCounts()));
         }
@@ -405,7 +405,7 @@
     private static String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
         ActivityId blockerAID = tid.getActivityId();
         ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
-        Task[] blockerTasks = blockerAC.getPlan().getTaskMap().get(blockerAID);
+        Task[] blockerTasks = blockerAC.getPlan().getActivityPlanMap().get(blockerAID).getTasks();
         List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
         if (tcAttempts == null || tcAttempts.isEmpty()) {
             return null;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
index 0d4dc77..edb4e93 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobActivityGraphJSONEvent;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobProfileJSONEvent;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSpecificationJSONEvent;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSummariesJSONEvent;
@@ -48,14 +49,18 @@
             case 2: {
                 JobId jobId = JobId.parse(arguments[0]);
 
-                if ("spec".equalsIgnoreCase(arguments[1])) {
+                if ("job-specification".equalsIgnoreCase(arguments[1])) {
                     GetJobSpecificationJSONEvent gjse = new GetJobSpecificationJSONEvent(ccs, jobId);
                     ccs.getJobQueue().scheduleAndSync(gjse);
-                    result.put("result", gjse.getSpecification());
+                    result.put("result", gjse.getJSON());
+                } else if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
+                    GetJobActivityGraphJSONEvent gjage = new GetJobActivityGraphJSONEvent(ccs, jobId);
+                    ccs.getJobQueue().scheduleAndSync(gjage);
+                    result.put("result", gjage.getJSON());
                 } else if ("profile".equalsIgnoreCase(arguments[1])) {
                     GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId);
                     ccs.getJobQueue().scheduleAndSync(gjpe);
-                    result.put("result", gjpe.getProfile());
+                    result.put("result", gjpe.getJSON());
                 }
 
                 break;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index 806a14e..efbc8f7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -46,7 +46,7 @@
     }
 
     private void addHandlers() {
-        ContextHandler handler = new ContextHandler("/state");
+        ContextHandler handler = new ContextHandler("/rest");
         RoutingHandler rh = new RoutingHandler();
         rh.addHandler("jobs", new JSONOutputRequestHandler(new RESTAPIFunction(ccs)));
         handler.setHandler(rh);