Merge branch 'master' into zheilbron/hyracks_asterix_issue470
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5976760..5294428 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -19,6 +19,7 @@
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
@@ -117,6 +118,8 @@
private final Map<JobId, JobRun> runMapArchive;
+ private final Map<JobId, List<Exception>> runMapHistory;
+
private final WorkQueue workQueue;
private final ExecutorService executor;
@@ -156,6 +159,15 @@
return size() > ccConfig.jobHistorySize;
}
};
+ runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+ private static final long serialVersionUID = 1L;
+ /** history size + 1 is for the case when history size = 0 */
+ private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+
+ protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
+ return size() > allowedSize;
+ }
+ };
workQueue = new WorkQueue();
this.timer = new Timer(true);
final ClusterTopology topology = computeClusterTopology(ccConfig);
@@ -252,6 +264,10 @@
return runMapArchive;
}
+ public Map<JobId, List<Exception>> getRunHistory() {
+ return runMapHistory;
+ }
+
public Map<String, Set<String>> getIpAddressNodeNameMap() {
return ipAddressNodeNameMap;
}
@@ -554,4 +570,4 @@
public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
deploymentRunMap.remove(deploymentKey);
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 7954c7c..ab218cc 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -81,6 +81,7 @@
run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
ccs.getActiveRunMap().remove(jobId);
ccs.getRunMapArchive().put(jobId, run);
+ ccs.getRunHistory().put(jobId, run.getExceptions());
try {
ccs.getJobLogFile().log(createJobLogObject(run));
} catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 65e1519..63e62a0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -70,6 +70,7 @@
run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
ccs.getActiveRunMap().remove(jobId);
ccs.getRunMapArchive().put(jobId, run);
+ ccs.getRunHistory().put(jobId, run.getExceptions());
try {
ccs.getJobLogFile().log(createJobLogObject(run));
} catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 6cfe025..8efea17 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
@@ -48,17 +50,31 @@
});
} else {
final IJobStatusConditionVariable cArchivedVar = ccs.getRunMapArchive().get(jobId);
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- cArchivedVar.waitForCompletion();
- callback.setValue(null);
- } catch (Exception e) {
- callback.setException(e);
+ if (cArchivedVar != null) {
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ cArchivedVar.waitForCompletion();
+ callback.setValue(null);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
}
- }
- });
+ });
+ } else {
+ final List<Exception> exceptions = ccs.getRunHistory().get(jobId);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ callback.setValue(null);
+ if (exceptions != null && exceptions.size() > 0) {
+ /** only report the first exception because IResultCallback will only throw one exception anyway */
+ callback.setException(exceptions.get(0));
+ }
+ }
+ });
+ }
}
}
}
\ No newline at end of file