Added more logging. Added REST API to fetch job run state

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@545 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 6231080..798c8a8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,8 +14,11 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -249,6 +252,42 @@
                     }
                     c.put("required-partitions", reqdParts);
 
+                    JSONArray attempts = new JSONArray();
+                    List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
+                    if (tcAttempts != null) {
+                        for (TaskClusterAttempt tca : tcAttempts) {
+                            JSONObject attempt = new JSONObject();
+                            attempt.put("attempt", tca.getAttempt());
+                            attempt.put("status", tca.getStatus());
+
+                            JSONArray taskAttempts = new JSONArray();
+                            for (TaskAttempt ta : tca.getTaskAttempts()) {
+                                JSONObject taskAttempt = new JSONObject();
+                                taskAttempt.put("task-attempt-id", ta.getTaskAttemptId());
+                                taskAttempt.put("status", ta.getStatus());
+                                taskAttempt.put("node-id", ta.getNodeId());
+                                Exception e = ta.getException();
+                                if (e != null) {
+                                    JSONObject ex = new JSONObject();
+                                    ex.put("exception-class", e.getClass().getName());
+                                    ex.put("exception-message", e.getMessage());
+                                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                                    PrintWriter pw = new PrintWriter(baos);
+                                    e.printStackTrace(pw);
+                                    pw.close();
+                                    ex.put("exception-stacktrace", new String(baos.toByteArray()));
+
+                                    taskAttempt.put("exception", ex);
+                                }
+                                taskAttempts.put(taskAttempt);
+                            }
+                            attempt.put("task-attempts", taskAttempts);
+
+                            attempts.put(attempt);
+                        }
+                    }
+                    c.put("attempts", attempts);
+
                     tClusters.put(c);
                 }
                 planJSON.put("task-clusters", tClusters);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobRunJSONEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobRunJSONEvent.java
new file mode 100644
index 0000000..3e19a70
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/GetJobRunJSONEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.job.manager.events;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableEvent;
+
+public class GetJobRunJSONEvent extends SynchronizableEvent {
+    private final ClusterControllerService ccs;
+    private final JobId jobId;
+    private JSONObject json;
+
+    public GetJobRunJSONEvent(ClusterControllerService ccs, JobId jobId) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        JobRun run = ccs.getRunMap().get(jobId);
+        if (run == null) {
+            json = new JSONObject();
+            return;
+        }
+        json = run.toJSON();
+    }
+
+    public JSONObject getJSON() {
+        return json;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
index d2c14b2..afa1946 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -29,6 +30,8 @@
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 
 public class PartitionMatchMaker {
+    private static final Logger LOGGER = Logger.getLogger(PartitionMatchMaker.class.getName());
+
     private final Map<PartitionId, List<PartitionDescriptor>> partitionDescriptors;
 
     private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
@@ -164,6 +167,7 @@
     }
 
     public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+        LOGGER.info("Removing uncommitted partitions: " + partitionIds);
         IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
             @Override
             public boolean matches(PartitionDescriptor o) {
@@ -182,6 +186,7 @@
     }
 
     public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
+        LOGGER.info("Removing partition requests: " + partitionIds);
         IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
             @Override
             public boolean matches(PartitionRequest o) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 2fc9624..616e8e7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -466,22 +466,23 @@
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+        LOGGER.info("Aborting task cluster: " + tcAttempt.getAttempt());
         Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
         for (TaskAttempt ta : tcAttempt.getTaskAttempts()) {
             TaskAttemptId taId = ta.getTaskAttemptId();
             abortTaskIds.add(taId);
-            if (ta.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
-                ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
-                List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
-                if (abortTaskAttempts == null) {
-                    abortTaskAttempts = new ArrayList<TaskAttemptId>();
-                    abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
-                }
-                abortTaskAttempts.add(taId);
+            LOGGER.info("Checking " + taId + ": " + ta.getStatus());
+            ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
+            List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
+            if (abortTaskAttempts == null) {
+                abortTaskAttempts = new ArrayList<TaskAttemptId>();
+                abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
             }
+            abortTaskAttempts.add(taId);
         }
         final JobId jobId = jobRun.getJobId();
+        LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
         for (Map.Entry<String, List<TaskAttemptId>> e : abortTaskAttemptMap.entrySet()) {
             final NodeControllerState node = ccs.getNodeMap().get(e.getKey());
             final List<TaskAttemptId> abortTaskAttempts = e.getValue();
@@ -517,8 +518,10 @@
 
         for (TaskCluster tc : doomedTaskClusters) {
             TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
-            abortTaskCluster(tca);
-            tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+            if (tca != null) {
+                abortTaskCluster(tca);
+                tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+            }
         }
     }
 
@@ -595,24 +598,21 @@
      */
     public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, Exception exception) {
         try {
+            LOGGER.info("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskCluster tc = ta.getTaskState().getTaskCluster();
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
             if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-                TaskAttempt.TaskStatus taStatus = ta.getStatus();
-                if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
-                    ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
-                    abortTaskCluster(lastAttempt);
-                    lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
-                    abortDoomedTaskClusters();
-                    if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
-                        abortJob(null);
-                        return;
-                    }
-                    startRunnableActivityClusters();
-                } else {
-                    LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
+                LOGGER.info("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
+                ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
+                abortTaskCluster(lastAttempt);
+                lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
+                abortDoomedTaskClusters();
+                if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
+                    abortJob(null);
+                    return;
                 }
+                startRunnableActivityClusters();
             } else {
                 LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = "
                         + lastAttempt);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
index edb4e93..a488d2f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/RESTAPIFunction.java
@@ -19,7 +19,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobActivityGraphJSONEvent;
-import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobProfileJSONEvent;
+import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobRunJSONEvent;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSpecificationJSONEvent;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.GetJobSummariesJSONEvent;
 import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
@@ -57,10 +57,10 @@
                     GetJobActivityGraphJSONEvent gjage = new GetJobActivityGraphJSONEvent(ccs, jobId);
                     ccs.getJobQueue().scheduleAndSync(gjage);
                     result.put("result", gjage.getJSON());
-                } else if ("profile".equalsIgnoreCase(arguments[1])) {
-                    GetJobProfileJSONEvent gjpe = new GetJobProfileJSONEvent(ccs, jobId);
-                    ccs.getJobQueue().scheduleAndSync(gjpe);
-                    result.put("result", gjpe.getJSON());
+                } else if ("job-run".equalsIgnoreCase(arguments[1])) {
+                    GetJobRunJSONEvent gjre = new GetJobRunJSONEvent(ccs, jobId);
+                    ccs.getJobQueue().scheduleAndSync(gjre);
+                    result.put("result", gjre.getJSON());
                 }
 
                 break;