added job info
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 0467eae..f886b53 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -31,6 +31,7 @@
         GET_CLUSTER_TOPOLOGY,
         CREATE_JOB,
         GET_JOB_STATUS,
+        GET_JOB_INFO,
         START_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
@@ -76,6 +77,25 @@
         }
     }
 
+    public static class GetJobInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobInfoFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_INFO;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class StartJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 36ceb18..4dc4e3c 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -103,4 +104,11 @@
                 deploymentId);
         rpci.call(ipcHandle, dbf);
     }
+
+    @Override
+    public JobInfo getJobInfo(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new HyracksClientInterfaceFunctions.GetJobInfoFunction(
+                jobId);
+        return (JobInfo) rpci.call(ipcHandle, gjsf);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 6af8dd9..ce620e2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -148,7 +149,7 @@
                 binaryURLs.add(new URL(url));
             }
         }
-        /**deploy the URLs to the CC and NCs*/
+        /** deploy the URLs to the CC and NCs */
         hci.deployBinary(binaryURLs, deploymentId);
         return deploymentId;
     }
@@ -176,4 +177,9 @@
             EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
+
+    @Override
+    public JobInfo getJobInfo(JobId jobId) throws Exception {
+        return hci.getJobInfo(jobId);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 41b07d7..0ed82de 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -44,6 +45,16 @@
     public JobStatus getJobStatus(JobId jobId) throws Exception;
 
     /**
+     * Gets detailed information about the specified Job.
+     * 
+     * @param jobId
+     *            JobId of the Job
+     * @return {@link JobStatus}
+     * @throws Exception
+     */
+    public JobInfo getJobInfo(JobId jobId) throws Exception;
+
+    /**
      * Start the specified Job.
      * 
      * @param appName
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index aabc351..591f485 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 
@@ -46,4 +47,7 @@
     public void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
     public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    public JobInfo getJobInfo(JobId jobId) throws Exception;
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 3431c40..7b580af 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -29,6 +29,11 @@
     }
 
     @Override
+    public JobSpecification getJobSpecification() {
+        return spec;
+    }
+
+    @Override
     public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
             final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
         final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index ae76455..b854102 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -23,4 +23,6 @@
 public interface IActivityClusterGraphGeneratorFactory extends Serializable {
     public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
             ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
+
+    public JobSpecification getJobSpecification();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
new file mode 100644
index 0000000..812a32a
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public class JobInfo implements Serializable{
+
+    private final JobId jobId;
+
+    private JobStatus status;
+
+    private List<Exception> exceptions;
+
+    private JobStatus pendingStatus;
+
+    private List<Exception> pendingExceptions;
+
+    private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
+    public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, List<String>> operatorLocations) {
+        this.jobId = jobId;
+        this.operatorLocations = operatorLocations;
+        this.status = status;
+    }
+
+    public JobStatus getStatus() {
+        return status;
+    }
+
+    public void setStatus(JobStatus status) {
+        this.status = status;
+    }
+
+    public List<Exception> getExceptions() {
+        return exceptions;
+    }
+
+    public void setExceptions(List<Exception> exceptions) {
+        this.exceptions = exceptions;
+    }
+
+    public JobStatus getPendingStatus() {
+        return pendingStatus;
+    }
+
+    public void setPendingStatus(JobStatus pendingStatus) {
+        this.pendingStatus = pendingStatus;
+    }
+
+    public List<Exception> getPendingExceptions() {
+        return pendingExceptions;
+    }
+
+    public void setPendingExceptions(List<Exception> pendingExceptions) {
+        this.pendingExceptions = pendingExceptions;
+    }
+
+    public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+        return operatorLocations;
+    }
+
+    public void setOperatorLocations(Map<OperatorDescriptorId, List<String>> operatorLocations) {
+        this.operatorLocations = operatorLocations;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5976760..fb0adde 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -42,6 +42,7 @@
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
@@ -54,6 +55,7 @@
 import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
+import edu.uci.ics.hyracks.control.cc.work.GetJobInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
 import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
@@ -332,6 +334,13 @@
                     return;
                 }
 
+                case GET_JOB_INFO: {
+                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+                    workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
+                            new IPCResponder<JobInfo>(handle, mid)));
+                    return;
+                }
+
                 case START_JOB: {
                     HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                     JobId jobId = createJobId();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 3a6dc26..a51f556 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,6 +28,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -84,6 +86,8 @@
 
     private List<Exception> pendingExceptions;
 
+    private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
             IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
         this.deploymentId = deploymentId;
@@ -98,6 +102,7 @@
         cleanupPendingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+        operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
     }
 
     public DeploymentId getDeploymentId() {
@@ -175,6 +180,15 @@
         this.endTime = endTime;
     }
 
+    public void registerOperatorLocation(OperatorDescriptorId op, String location) {
+        List<String> locations = operatorLocations.get(op);
+        if (locations == null) {
+            locations = new ArrayList<String>();
+            operatorLocations.put(op, locations);
+        }
+        locations.add(location);
+    }
+
     @Override
     public synchronized void waitForCompletion() throws Exception {
         while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
@@ -370,4 +384,8 @@
 
         return result;
     }
+
+    public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+        return operatorLocations;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index d09b641..1d17dc7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -88,6 +89,7 @@
 
     public void startJob() throws HyracksException {
         startRunnableActivityClusters();
+        ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
     }
 
     private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
@@ -322,6 +324,8 @@
                 tads = new ArrayList<TaskAttemptDescriptor>();
                 taskAttemptMap.put(nodeId, tads);
             }
+            OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
+            jobRun.registerOperatorLocation(opId, nodeId);
             ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
             TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
                     apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
new file mode 100644
index 0000000..c2e08f2
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetJobInfoWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final JobId jobId;
+    private final IResultCallback<JobInfo> callback;
+
+    public GetJobInfoWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobInfo> callback) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            JobRun run = ccs.getActiveRunMap().get(jobId);
+            if (run == null) {
+                run = ccs.getRunMapArchive().get(jobId);
+            }
+            JobInfo info = run == null ? null
+                    : new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations());
+            callback.setValue(info);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}
\ No newline at end of file