Merge branch 'master' into yingyi/fullstack_beta_fix
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5976760..5294428 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -19,6 +19,7 @@
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
@@ -117,6 +118,8 @@
 
     private final Map<JobId, JobRun> runMapArchive;
 
+    private final Map<JobId, List<Exception>> runMapHistory;
+
     private final WorkQueue workQueue;
 
     private final ExecutorService executor;
@@ -156,6 +159,15 @@
                 return size() > ccConfig.jobHistorySize;
             }
         };
+        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+            private static final long serialVersionUID = 1L;
+            /** history size + 1 is for the case when history size = 0 */
+            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+
+            protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
+                return size() > allowedSize;
+            }
+        };
         workQueue = new WorkQueue();
         this.timer = new Timer(true);
         final ClusterTopology topology = computeClusterTopology(ccConfig);
@@ -252,6 +264,10 @@
         return runMapArchive;
     }
 
+    public Map<JobId, List<Exception>> getRunHistory() {
+        return runMapHistory;
+    }
+
     public Map<String, Set<String>> getIpAddressNodeNameMap() {
         return ipAddressNodeNameMap;
     }
@@ -554,4 +570,4 @@
     public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
         deploymentRunMap.remove(deploymentKey);
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 3a6dc26..692e3d7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -50,8 +50,6 @@
 
     private final JobId jobId;
 
-    private final IActivityClusterGraphGenerator acgg;
-
     private final ActivityClusterGraph acg;
 
     private final JobScheduler scheduler;
@@ -88,7 +86,6 @@
             IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
-        this.acgg = acgg;
         this.acg = acgg.initialize();
         this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
         this.jobFlags = jobFlags;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 7954c7c..ab218cc 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -81,6 +81,7 @@
             run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
             ccs.getActiveRunMap().remove(jobId);
             ccs.getRunMapArchive().put(jobId, run);
+            ccs.getRunHistory().put(jobId, run.getExceptions());
             try {
                 ccs.getJobLogFile().log(createJobLogObject(run));
             } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 65e1519..63e62a0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -70,6 +70,7 @@
             run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
             ccs.getActiveRunMap().remove(jobId);
             ccs.getRunMapArchive().put(jobId, run);
+            ccs.getRunHistory().put(jobId, run.getExceptions());
             try {
                 ccs.getJobLogFile().log(createJobLogObject(run));
             } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 6cfe025..8efea17 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
@@ -48,17 +50,31 @@
             });
         } else {
             final IJobStatusConditionVariable cArchivedVar = ccs.getRunMapArchive().get(jobId);
-            ccs.getExecutor().execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        cArchivedVar.waitForCompletion();
-                        callback.setValue(null);
-                    } catch (Exception e) {
-                        callback.setException(e);
+            if (cArchivedVar != null) {
+                ccs.getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            cArchivedVar.waitForCompletion();
+                            callback.setValue(null);
+                        } catch (Exception e) {
+                            callback.setException(e);
+                        }
                     }
-                }
-            });
+                });
+            } else {
+                final List<Exception> exceptions = ccs.getRunHistory().get(jobId);
+                ccs.getExecutor().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        callback.setValue(null);
+                        if (exceptions != null && exceptions.size() > 0) {
+                            /** only report the first exception because IResultCallback will only throw one exception anyway */
+                            callback.setException(exceptions.get(0));
+                        }
+                    }
+                });
+            }
         }
     }
 }
\ No newline at end of file