clean up the checkpointing control
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index e65e987..3e6e9a5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -99,7 +99,7 @@
IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
- int maxRetryCount = 1;
+ int maxRetryCount = 3;
do {
try {
@@ -127,8 +127,9 @@
/** run loop-body jobs */
runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
- ckpHook);
+ ckpHook, failed);
runClearState(deploymentId, jobGen);
+ failed = false;
}
/** finish the jobs */
@@ -252,12 +253,14 @@
}
private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
- IntWritable snapshotJobIndex, IntWritable snapshotSuperstep, ICheckpointHook ckpHook) throws Exception {
- if (snapshotJobIndex.get() >= 0 && snapshotSuperstep.get() > 0) {
+ IntWritable snapshotJobIndex, IntWritable snapshotSuperstep, ICheckpointHook ckpHook, boolean doRecovery)
+ throws Exception {
+ if (doRecovery) {
/** reload the checkpoint */
runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
+
}
- int i = snapshotSuperstep.get() + 1;
+ int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
boolean terminate = false;
long start, end, time;
do {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index b6d2851..d37c6fd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -411,7 +411,7 @@
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
- JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, pregelixJob);
+ JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
specs[0] = vertexLoadSpec;
for (int i = 1; i < specs.length; i++) {