Added more logging. Added REST API to fetch job run state
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@545 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 6231080..798c8a8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.hyracks.control.cc.job;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -249,6 +252,42 @@
}
c.put("required-partitions", reqdParts);
+ JSONArray attempts = new JSONArray();
+ List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
+ if (tcAttempts != null) {
+ for (TaskClusterAttempt tca : tcAttempts) {
+ JSONObject attempt = new JSONObject();
+ attempt.put("attempt", tca.getAttempt());
+ attempt.put("status", tca.getStatus());
+
+ JSONArray taskAttempts = new JSONArray();
+ for (TaskAttempt ta : tca.getTaskAttempts()) {
+ JSONObject taskAttempt = new JSONObject();
+ taskAttempt.put("task-attempt-id", ta.getTaskAttemptId());
+ taskAttempt.put("status", ta.getStatus());
+ taskAttempt.put("node-id", ta.getNodeId());
+ Exception e = ta.getException();
+ if (e != null) {
+ JSONObject ex = new JSONObject();
+ ex.put("exception-class", e.getClass().getName());
+ ex.put("exception-message", e.getMessage());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ e.printStackTrace(pw);
+ pw.close();
+ ex.put("exception-stacktrace", new String(baos.toByteArray()));
+
+ taskAttempt.put("exception", ex);
+ }
+ taskAttempts.put(taskAttempt);
+ }
+ attempt.put("task-attempts", taskAttempts);
+
+ attempts.put(attempt);
+ }
+ }
+ c.put("attempts", attempts);
+
tClusters.put(c);
}
planJSON.put("task-clusters", tClusters);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobRunJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobRunJSONEvent.java
new file mode 100644
index 0000000..3e19a70
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobRunJSONEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job.manager.events;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
+
+public class GetJobRunJSONEvent extends SynchronizableEvent {
+ private final ClusterControllerService ccs;
+ private final JobId jobId;
+ private JSONObject json;
+
+ public GetJobRunJSONEvent(ClusterControllerService ccs, JobId jobId) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ JobRun run = ccs.getRunMap().get(jobId);
+ if (run == null) {
+ json = new JSONObject();
+ return;
+ }
+ json = run.toJSON();
+ }
+
+ public JSONObject getJSON() {
+ return json;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
index d2c14b2..afa1946 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -29,6 +30,8 @@
import edu.uci.ics.hyracks.control.common.job.PartitionState;
public class PartitionMatchMaker {
+ private static final Logger LOGGER = Logger.getLogger(PartitionMatchMaker.class.getName());
+
private final Map<PartitionId, List<PartitionDescriptor>> partitionDescriptors;
private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
@@ -164,6 +167,7 @@
}
public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+ LOGGER.info("Removing uncommitted partitions: " + partitionIds);
IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
@Override
public boolean matches(PartitionDescriptor o) {
@@ -182,6 +186,7 @@
}
public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+ LOGGER.info("Removing partition requests: " + partitionIds);
IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
@Override
public boolean matches(PartitionRequest o) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 2fc9624..616e8e7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -466,22 +466,23 @@
}
private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+ LOGGER.info("Aborting task cluster: " + tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
TaskAttemptId taId = ta.getTaskAttemptId();
abortTaskIds.add(taId);
- if (ta.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
- ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
- List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
- if (abortTaskAttempts == null) {
- abortTaskAttempts = new ArrayList<TaskAttemptId>();
- abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
- }
- abortTaskAttempts.add(taId);
+ LOGGER.info("Checking " + taId + ": " + ta.getStatus());
+ ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+ List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
+ if (abortTaskAttempts == null) {
+ abortTaskAttempts = new ArrayList<TaskAttemptId>();
+ abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
}
+ abortTaskAttempts.add(taId);
}
final JobId jobId = jobRun.getJobId();
+ LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
final List<TaskAttemptId> abortTaskAttempts = e.getValue();
@@ -517,8 +518,10 @@
for (TaskCluster tc : doomedTaskClusters) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
- abortTaskCluster(tca);
- tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ if (tca != null) {
+ abortTaskCluster(tca);
+ tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ }
}
}
@@ -595,24 +598,21 @@
*/
public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, Exception exception) {
try {
+ LOGGER.info("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTaskState().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
- TaskAttempt.TaskStatus taStatus = ta.getStatus();
- if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
- ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
- abortTaskCluster(lastAttempt);
- lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
- abortDoomedTaskClusters();
- if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
- abortJob(null);
- return;
- }
- startRunnableActivityClusters();
- } else {
- LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
+ LOGGER.info("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
+ ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
+ abortTaskCluster(lastAttempt);
+ lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
+ abortDoomedTaskClusters();
+ if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
+ abortJob(null);
+ return;
}
+ startRunnableActivityClusters();
} else {
LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = "
+ lastAttempt);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
index edb4e93..a488d2f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
@@ -19,7 +19,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobActivityGraphJSONEvent;
-import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobProfileJSONEvent;
+import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobRunJSONEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSpecificationJSONEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSummariesJSONEvent;
import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
@@ -57,10 +57,10 @@
GetJobActivityGraphJSONEvent gjage = new GetJobActivityGraphJSONEvent(ccs, jobId);
ccs.getJobQueue().scheduleAndSync(gjage);
result.put("result", gjage.getJSON());
- } else if ("profile".equalsIgnoreCase(arguments[1])) {
- GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId);
- ccs.getJobQueue().scheduleAndSync(gjpe);
- result.put("result", gjpe.getJSON());
+ } else if ("job-run".equalsIgnoreCase(arguments[1])) {
+ GetJobRunJSONEvent gjre = new GetJobRunJSONEvent(ccs, jobId);
+ ccs.getJobQueue().scheduleAndSync(gjre);
+ result.put("result", gjre.getJSON());
}
break;