Fixed REST Api
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@541 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index f369f95..930d299 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -22,6 +22,10 @@
import java.util.Map;
import java.util.Set;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
@@ -201,4 +205,59 @@
buffer.append('\n');
return buffer.toString();
}
+
+ public JSONObject toJSON() throws JSONException {
+ JSONObject jplan = new JSONObject();
+
+ jplan.put("type", "plan");
+ jplan.put("flags", jobFlags.toString());
+
+ JSONArray jans = new JSONArray();
+ for (IActivity an : activityNodes.values()) {
+ JSONObject jan = new JSONObject();
+ jan.put("type", "activity");
+ jan.put("id", an.getActivityId().toString());
+ jan.put("java-class", an.getClass().getName());
+ jan.put("operator-id", an.getActivityId().getOperatorDescriptorId().toString());
+
+ List<IConnectorDescriptor> inputs = getActivityInputConnectorDescriptors(an.getActivityId());
+ if (inputs != null) {
+ JSONArray jInputs = new JSONArray();
+ for (int i = 0; i < inputs.size(); ++i) {
+ JSONObject jInput = new JSONObject();
+ jInput.put("type", "activity-input");
+ jInput.put("input-port", i);
+ jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
+ jInputs.put(jInput);
+ }
+ jan.put("inputs", jInputs);
+ }
+
+ List<IConnectorDescriptor> outputs = getActivityOutputConnectorDescriptors(an.getActivityId());
+ if (outputs != null) {
+ JSONArray jOutputs = new JSONArray();
+ for (int i = 0; i < outputs.size(); ++i) {
+ JSONObject jOutput = new JSONObject();
+ jOutput.put("type", "activity-output");
+ jOutput.put("output-port", i);
+ jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
+ jOutputs.put(jOutput);
+ }
+ jan.put("outputs", jOutputs);
+ }
+
+ Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
+ if (blockers != null) {
+ JSONArray jDeps = new JSONArray();
+ for (ActivityId blocker : blockers) {
+ jDeps.put(blocker.toString());
+ }
+ jan.put("depends-on", jDeps);
+ }
+ jans.put(jan);
+ }
+ jplan.put("activities", jans);
+
+ return jplan;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index ae95ca8..14cebb2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -28,6 +28,8 @@
private final Set<ActivityCluster> dependents;
+ private ActivityClusterId id;
+
private ActivityClusterPlan acp;
public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
@@ -61,6 +63,14 @@
return jobRun;
}
+ public ActivityClusterId getActivityClusterId() {
+ return id;
+ }
+
+ public void setActivityClusterId(ActivityClusterId id) {
+ this.id = id;
+ }
+
public int getMaxTaskClusterAttempts() {
return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
new file mode 100644
index 0000000..46628a4
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.io.Serializable;
+
+public final class ActivityClusterId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int id;
+
+ public ActivityClusterId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + id;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ActivityClusterId other = (ActivityClusterId) obj;
+ if (id != other.id)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "AC:" + id;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
index 531dc92..2b1d3e2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterPlan.java
@@ -19,17 +19,17 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
public class ActivityClusterPlan {
- private final Map<ActivityId, Task[]> taskStateMap;
+ private final Map<ActivityId, ActivityPlan> activityPlanMap;
private final TaskCluster[] taskClusters;
- public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, Task[]> taskStateMap) {
- this.taskStateMap = taskStateMap;
+ public ActivityClusterPlan(TaskCluster[] taskClusters, Map<ActivityId, ActivityPlan> activityPlanMap) {
+ this.activityPlanMap = activityPlanMap;
this.taskClusters = taskClusters;
}
- public Map<ActivityId, Task[]> getTaskMap() {
- return taskStateMap;
+ public Map<ActivityId, ActivityPlan> getActivityPlanMap() {
+ return activityPlanMap;
}
public TaskCluster[] getTaskClusters() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityPlan.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityPlan.java
new file mode 100644
index 0000000..beeca27
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityPlan.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
+
+public class ActivityPlan {
+ private final ActivityPartitionDetails apd;
+
+ private Task[] tasks;
+
+ public ActivityPlan(ActivityPartitionDetails apd) {
+ this.apd = apd;
+ }
+
+ public ActivityPartitionDetails getActivityPartitionDetails() {
+ return apd;
+ }
+
+ public Task[] getTasks() {
+ return tasks;
+ }
+
+ public void setTasks(Task[] tasks) {
+ this.tasks = tasks;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 3bea59a..6231080 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -19,14 +19,21 @@
import java.util.Map;
import java.util.Set;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
@@ -130,4 +137,128 @@
public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
return connectorPolicyMap;
}
+
+ public JSONObject toJSON() throws JSONException {
+ JSONObject result = new JSONObject();
+
+ result.put("job-id", jobId.toString());
+
+ JSONArray aClusters = new JSONArray();
+ for (ActivityCluster ac : activityClusters) {
+ JSONObject acJSON = new JSONObject();
+
+ acJSON.put("activity-cluster-id", String.valueOf(ac.getActivityClusterId()));
+
+ JSONArray activitiesJSON = new JSONArray();
+ for (ActivityId aid : ac.getActivities()) {
+ activitiesJSON.put(aid);
+ }
+ acJSON.put("activities", activitiesJSON);
+
+ JSONArray dependentsJSON = new JSONArray();
+ for (ActivityCluster dependent : ac.getDependents()) {
+ dependentsJSON.put(String.valueOf(dependent.getActivityClusterId()));
+ }
+ acJSON.put("dependents", dependentsJSON);
+
+ JSONArray dependenciesJSON = new JSONArray();
+ for (ActivityCluster dependency : ac.getDependencies()) {
+ dependenciesJSON.put(String.valueOf(dependency.getActivityClusterId()));
+ }
+ acJSON.put("dependencies", dependenciesJSON);
+
+ ActivityClusterPlan acp = ac.getPlan();
+ if (acp == null) {
+ acJSON.put("plan", (Object) null);
+ } else {
+ JSONObject planJSON = new JSONObject();
+
+ JSONArray acTasks = new JSONArray();
+ for (Map.Entry<ActivityId, ActivityPlan> e : acp.getActivityPlanMap().entrySet()) {
+ ActivityPlan acPlan = e.getValue();
+ JSONObject entry = new JSONObject();
+ entry.put("activity-id", e.getKey().toString());
+
+ ActivityPartitionDetails apd = acPlan.getActivityPartitionDetails();
+ entry.put("partition-count", apd.getPartitionCount());
+
+ JSONArray inPartCountsJSON = new JSONArray();
+ int[] inPartCounts = apd.getInputPartitionCounts();
+ if (inPartCounts != null) {
+ for (int i : inPartCounts) {
+ inPartCountsJSON.put(i);
+ }
+ }
+ entry.put("input-partition-counts", inPartCountsJSON);
+
+ JSONArray outPartCountsJSON = new JSONArray();
+ int[] outPartCounts = apd.getOutputPartitionCounts();
+ if (outPartCounts != null) {
+ for (int o : outPartCounts) {
+ outPartCountsJSON.put(o);
+ }
+ }
+ entry.put("output-partition-counts", outPartCountsJSON);
+
+ JSONArray tasks = new JSONArray();
+ for (Task t : acPlan.getTasks()) {
+ JSONObject task = new JSONObject();
+
+ task.put("task-id", t.getTaskId().toString());
+
+ JSONArray dependentTasksJSON = new JSONArray();
+ for (TaskId dependent : t.getDependents()) {
+ dependentTasksJSON.put(dependent.toString());
+ }
+ task.put("dependents", dependentTasksJSON);
+
+ JSONArray dependencyTasksJSON = new JSONArray();
+ for (TaskId dependency : t.getDependencies()) {
+ dependencyTasksJSON.put(dependency.toString());
+ }
+ task.put("dependencies", dependencyTasksJSON);
+
+ tasks.put(task);
+ }
+ entry.put("tasks", tasks);
+
+ acTasks.put(entry);
+ }
+ planJSON.put("task-map", acTasks);
+
+ JSONArray tClusters = new JSONArray();
+ for (TaskCluster tc : acp.getTaskClusters()) {
+ JSONObject c = new JSONObject();
+ c.put("task-cluster-id", String.valueOf(tc.getTaskClusterId()));
+
+ JSONArray tasks = new JSONArray();
+ for (Task t : tc.getTasks()) {
+ tasks.put(t.getTaskId().toString());
+ }
+ c.put("tasks", tasks);
+
+ JSONArray prodParts = new JSONArray();
+ for (PartitionId p : tc.getProducedPartitions()) {
+ prodParts.put(p.toString());
+ }
+ c.put("produced-partitions", prodParts);
+
+ JSONArray reqdParts = new JSONArray();
+ for (PartitionId p : tc.getRequiredPartitions()) {
+ reqdParts.put(p.toString());
+ }
+ c.put("required-partitions", reqdParts);
+
+ tClusters.put(c);
+ }
+ planJSON.put("task-clusters", tClusters);
+
+ acJSON.put("plan", planJSON);
+ }
+ aClusters.put(acJSON);
+ }
+ result.put("activity-clusters", aClusters);
+
+ return result;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
index 28deb44..84ae6e5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/Task.java
@@ -18,12 +18,11 @@
import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
public class Task {
private final TaskId taskId;
- private final ActivityPartitionDetails apd;
+ private final ActivityPlan activityPlan;
private final Set<TaskId> dependencies;
@@ -31,9 +30,9 @@
private TaskCluster taskCluster;
- public Task(TaskId taskId, ActivityPartitionDetails apd) {
+ public Task(TaskId taskId, ActivityPlan activityPlan) {
this.taskId = taskId;
- this.apd = apd;
+ this.activityPlan = activityPlan;
this.dependencies = new HashSet<TaskId>();
this.dependents = new HashSet<TaskId>();
}
@@ -42,8 +41,8 @@
return taskId;
}
- public ActivityPartitionDetails getActivityPartitionDetails() {
- return apd;
+ public ActivityPlan getActivityPlan() {
+ return activityPlan;
}
public Set<TaskId> getDependencies() {
@@ -64,6 +63,6 @@
@Override
public String toString() {
- return taskId + "(" + apd + ")";
+ return String.valueOf(taskId);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index d3de5a1..f41cda9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class TaskCluster {
+ private final TaskClusterId taskClusterId;
+
private final ActivityCluster ac;
private final Task[] tasks;
@@ -37,7 +39,8 @@
private final List<TaskClusterAttempt> taskClusterAttempts;
- public TaskCluster(ActivityCluster ac, Task[] tasks) {
+ public TaskCluster(TaskClusterId taskClusterId, ActivityCluster ac, Task[] tasks) {
+ this.taskClusterId = taskClusterId;
this.ac = ac;
this.tasks = tasks;
producedPartitions = new HashSet<PartitionId>();
@@ -47,6 +50,10 @@
taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
}
+ public TaskClusterId getTaskClusterId() {
+ return taskClusterId;
+ }
+
public ActivityCluster getActivityCluster() {
return ac;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
new file mode 100644
index 0000000..b1000dd
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.io.Serializable;
+
+public final class TaskClusterId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityClusterId activityClusterId;
+
+ private final int id;
+
+ public TaskClusterId(ActivityClusterId activityClusterId, int id) {
+ this.activityClusterId = activityClusterId;
+ this.id = id;
+ }
+
+ public ActivityClusterId getActivityClusterId() {
+ return activityClusterId;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((activityClusterId == null) ? 0 : activityClusterId.hashCode());
+ result = prime * result + id;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TaskClusterId other = (TaskClusterId) obj;
+ if (activityClusterId == null) {
+ if (other.activityClusterId != null)
+ return false;
+ } else if (!activityClusterId.equals(other.activityClusterId))
+ return false;
+ if (id != other.id)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "TC:" + activityClusterId + ":" + id;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index 6975d4e..0028b6f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
@@ -51,8 +52,8 @@
Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
if (ac != null) {
- Map<ActivityId, Task[]> taskStateMap = ac.getPlan().getTaskMap();
- Task[] taskStates = taskStateMap.get(tid.getActivityId());
+ Map<ActivityId, ActivityPlan> taskStateMap = ac.getPlan().getActivityPlanMap();
+ Task[] taskStates = taskStateMap.get(tid.getActivityId()).getTasks();
if (taskStates != null && taskStates.length > tid.getPartition()) {
Task ts = taskStates[tid.getPartition()];
TaskCluster tc = ts.getTaskCluster();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobActivityGraphJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobActivityGraphJSONEvent.java
new file mode 100644
index 0000000..14b56df
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobActivityGraphJSONEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job.manager.events;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
+
+public class GetJobActivityGraphJSONEvent extends SynchronizableEvent {
+ private final ClusterControllerService ccs;
+ private final JobId jobId;
+ private JSONObject json;
+
+ public GetJobActivityGraphJSONEvent(ClusterControllerService ccs, JobId jobId) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ JobRun run = ccs.getRunMap().get(jobId);
+ if (run == null) {
+ json = new JSONObject();
+ return;
+ }
+ json = run.getJobActivityGraph().toJSON();
+ }
+
+ public JSONObject getJSON() {
+ return json;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
index 738d86e..4355962 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobProfileJSONEvent.java
@@ -24,7 +24,7 @@
public class GetJobProfileJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final JobId jobId;
- private JSONObject profile;
+ private JSONObject json;
public GetJobProfileJSONEvent(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
@@ -33,16 +33,16 @@
@Override
protected void doRun() throws Exception {
- profile = new JSONObject();
+ json = new JSONObject();
JobRun jobRun = ccs.getRunMap().get(jobId);
if (jobRun == null) {
- profile = new JSONObject();
+ json = new JSONObject();
return;
}
- profile = jobRun.getJobProfile().toJSON();
+ json = jobRun.getJobProfile().toJSON();
}
- public JSONObject getProfile() {
- return profile;
+ public JSONObject getJSON() {
+ return json;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
index 884ddf2..db7e5a7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobSpecificationJSONEvent.java
@@ -24,7 +24,7 @@
public class GetJobSpecificationJSONEvent extends SynchronizableEvent {
private final ClusterControllerService ccs;
private final JobId jobId;
- private JSONObject spec;
+ private JSONObject json;
public GetJobSpecificationJSONEvent(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
@@ -35,13 +35,13 @@
protected void doRun() throws Exception {
JobRun run = ccs.getRunMap().get(jobId);
if (run == null) {
- spec = new JSONObject();
+ json = new JSONObject();
return;
}
- spec = run.getJobActivityGraph().getJobSpecification().toJSON();
+ json = run.getJobActivityGraph().getJobSpecification().toJSON();
}
- public JSONObject getSpecification() {
- return spec;
+ public JSONObject getJSON() {
+ return json;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
index 81883d67..0e59c8df 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.util.Pair;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityClusterId;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
public class ActivityClusterGraphBuilder {
@@ -123,7 +124,9 @@
}
}
Set<ActivityCluster> roots = new HashSet<ActivityCluster>();
+ int idCounter = 0;
for (ActivityCluster s : stages) {
+ s.setActivityClusterId(new ActivityClusterId(idCounter++));
if (s.getDependents().isEmpty()) {
roots.add(s);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 57df351..e5d9f64 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -41,9 +41,11 @@
import edu.uci.ics.hyracks.api.util.Pair;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
+import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
public class ActivityClusterPlanner {
private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
@@ -61,7 +63,7 @@
JobRun jobRun = scheduler.getJobRun();
Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
- Map<ActivityId, Task[]> taskMap = new HashMap<ActivityId, Task[]>();
+ Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
Set<ActivityId> activities = ac.getActivities();
Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
@@ -72,15 +74,16 @@
getDependencyActivityIds(depAnIds, anId);
ActivityPartitionDetails apd = pcMap.get(anId);
Task[] tasks = new Task[apd.getPartitionCount()];
+ ActivityPlan activityPlan = new ActivityPlan(apd);
for (int i = 0; i < tasks.length; ++i) {
TaskId tid = new TaskId(anId, i);
- tasks[i] = new Task(tid, apd);
+ tasks[i] = new Task(tid, activityPlan);
for (ActivityId danId : depAnIds) {
ActivityCluster dAC = jobRun.getActivityClusterMap().get(danId);
ActivityClusterPlan dACP = dAC.getPlan();
assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+ danId;
- Task[] dATasks = dACP.getTaskMap().get(danId);
+ Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+ danId;
assert dATasks.length == tasks.length : "Dependency activity partitioned differently from dependent: "
@@ -94,24 +97,25 @@
cluster.add(tid);
taskClusterMap.put(tid, cluster);
}
- taskMap.put(anId, tasks);
+ activityPlan.setTasks(tasks);
+ activityPlanMap.put(anId, activityPlan);
}
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, taskMap, pcMap);
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, activityPlanMap);
scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
JobActivityGraph jag = jobRun.getJobActivityGraph();
BitSet targetBitmap = new BitSet();
for (ActivityId ac1 : activities) {
- Task[] ac1TaskStates = taskMap.get(ac1);
+ Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
ActivityId ac2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = taskMap.get(ac2);
+ Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
int nConsumers = ac2TaskStates.length;
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -164,15 +168,17 @@
Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
+ int counter = 0;
for (Set<TaskId> cluster : clusters) {
Set<Task> taskStates = new HashSet<Task>();
for (TaskId tid : cluster) {
- taskStates.add(taskMap.get(tid.getActivityId())[tid.getPartition()]);
+ taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
}
- TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
+ TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), counter++), ac,
+ taskStates.toArray(new Task[taskStates.size()]));
tcSet.add(tc);
for (TaskId tid : cluster) {
- taskMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
+ activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()].setTaskCluster(tc);
}
}
TaskCluster[] taskClusters = tcSet.toArray(new TaskCluster[tcSet.size()]);
@@ -184,7 +190,7 @@
List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
if (cInfoList != null) {
for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
- Task targetTS = taskMap.get(p.first.getActivityId())[p.first.getPartition()];
+ Task targetTS = activityPlanMap.get(p.first.getActivityId()).getTasks()[p.first.getPartition()];
TaskCluster targetTC = targetTS.getTaskCluster();
if (targetTC != tc) {
ConnectorDescriptorId cdId = p.second;
@@ -213,13 +219,13 @@
}
}
- ac.setPlan(new ActivityClusterPlan(taskClusters, taskMap));
+ ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
}
private TaskCluster getTaskCluster(TaskId tid) {
ActivityCluster ac = scheduler.getJobRun().getActivityClusterMap().get(tid.getActivityId());
ActivityClusterPlan acp = ac.getPlan();
- Task[] tasks = acp.getTaskMap().get(tid.getActivityId());
+ Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
Task task = tasks[tid.getPartition()];
assert task.getTaskId().equals(tid);
return task.getTaskCluster();
@@ -234,20 +240,20 @@
}
private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
- Map<ActivityId, Task[]> taskMap, Map<ActivityId, ActivityPartitionDetails> pcMap) {
+ Map<ActivityId, ActivityPlan> taskMap) {
JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
Set<ActivityId> activities = ac.getActivities();
BitSet targetBitmap = new BitSet();
for (ActivityId a1 : activities) {
- Task[] ac1TaskStates = taskMap.get(a1);
+ Task[] ac1TaskStates = taskMap.get(a1).getTasks();
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(a1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
ActivityId a2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = taskMap.get(a2);
+ Task[] ac2TaskStates = taskMap.get(a2).getTasks();
int nConsumers = ac2TaskStates.length;
int[] fanouts = new int[nProducers];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 356c96bb..2fc9624 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -392,7 +392,7 @@
tads = new ArrayList<TaskAttemptDescriptor>();
taskAttemptMap.put(nodeId, tads);
}
- ActivityPartitionDetails apd = ts.getActivityPartitionDetails();
+ ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
tads.add(new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd.getPartitionCount(), apd
.getInputPartitionCounts(), apd.getOutputPartitionCounts()));
}
@@ -405,7 +405,7 @@
private static String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
ActivityId blockerAID = tid.getActivityId();
ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
- Task[] blockerTasks = blockerAC.getPlan().getTaskMap().get(blockerAID);
+ Task[] blockerTasks = blockerAC.getPlan().getActivityPlanMap().get(blockerAID).getTasks();
List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
if (tcAttempts == null || tcAttempts.isEmpty()) {
return null;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
index 0d4dc77..edb4e93 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobActivityGraphJSONEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobProfileJSONEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSpecificationJSONEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSummariesJSONEvent;
@@ -48,14 +49,18 @@
case 2: {
JobId jobId = JobId.parse(arguments[0]);
- if ("spec".equalsIgnoreCase(arguments[1])) {
+ if ("job-specification".equalsIgnoreCase(arguments[1])) {
GetJobSpecificationJSONEvent gjse = new GetJobSpecificationJSONEvent(ccs, jobId);
ccs.getJobQueue().scheduleAndSync(gjse);
- result.put("result", gjse.getSpecification());
+ result.put("result", gjse.getJSON());
+ } else if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
+ GetJobActivityGraphJSONEvent gjage = new GetJobActivityGraphJSONEvent(ccs, jobId);
+ ccs.getJobQueue().scheduleAndSync(gjage);
+ result.put("result", gjage.getJSON());
} else if ("profile".equalsIgnoreCase(arguments[1])) {
GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId);
ccs.getJobQueue().scheduleAndSync(gjpe);
- result.put("result", gjpe.getProfile());
+ result.put("result", gjpe.getJSON());
}
break;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index 806a14e..efbc8f7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -46,7 +46,7 @@
}
private void addHandlers() {
- ContextHandler handler = new ContextHandler("/state");
+ ContextHandler handler = new ContextHandler("/rest");
RoutingHandler rh = new RoutingHandler();
rh.addHandler("jobs", new JSONOutputRequestHandler(new RESTAPIFunction(ccs)));
handler.setHandler(rh);