fix the checkpoint writing for vertex checkpointing and message checkpointing
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d68ad2c..30e16b5 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -549,4 +549,26 @@
public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
}
+
+ /**
+ * Get the path for vertex checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for vertex checkpointing
+ */
+ public static String getVertexCheckpointPath(Configuration conf, long lastSuperStep) {
+ return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/vertex/" + lastSuperStep;
+ }
+
+ /**
+ * Get the path for message checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for message checkpointing
+ */
+ public static String getMessageCheckpointPath(Configuration conf, long lastSuperStep) {
+ return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7bd2cf8..3a0f20d7e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -272,8 +272,8 @@
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
if (ckpHook.checkpoint(i)) {
runCheckpoint(deploymentId, jobGen, i);
- snapshotSuperstep.set(i);
snapshotJobIndex.set(currentJobIndex);
+ snapshotSuperstep.set(i);
}
i++;
} while (!terminate);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 36723a6..13fd8be 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -137,8 +137,6 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- private String vertexCheckpointPath;
-
public JobGen(PregelixJob job) {
init(job);
}
@@ -148,10 +146,7 @@
pregelixJob = job;
initJobConfiguration();
job.setJobId(jobId);
-
- vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
- // set the frame size to be the one user specified if the user did
- // specify.
+ // set the frame size to be the one user specified if the user did specify.
int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
if (specifiedFrameSize > 0) {
frameSize = specifiedFrameSize;
@@ -376,13 +371,16 @@
@Override
public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
try {
-
PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
- FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
+ FileOutputFormat.setOutputPath(tmpJob,
+ new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)));
tmpJob.setOutputKeyClass(NullWritable.class);
tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+ FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
+ dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+ dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
specs[0] = vertexCkpSpec;
@@ -412,7 +410,10 @@
try {
PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
- FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
+ FileInputFormat.setInputPaths(tmpJob,
+ new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)));
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)), true);
JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
@@ -640,7 +641,7 @@
MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
ClusterConfig.setLocationConstraint(spec, materializeRead);
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
+ String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -660,7 +661,7 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
throws HyracksException {
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
+ String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastCheckpointedIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
try {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 9a21680..d3a9890 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -161,7 +161,7 @@
FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
@Override
public boolean accept(Path dir) {
- return dir.getName().endsWith(TEMP_DIR);
+ return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
}
});
Path tempDir = tempPaths[0].getPath();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index 5a485ba..dc7a28d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -24,6 +24,7 @@
import org.junit.Test;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
@@ -55,7 +56,7 @@
job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job1, INPUTPATH);
job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- //job1.setCheckpointHook(ConservativeCheckpointHook.class);
+ job1.setCheckpointHook(ConservativeCheckpointHook.class);
PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
job2.setVertexClass(PageRankVertex.class);
@@ -65,7 +66,7 @@
job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- //job2.setCheckpointHook(ConservativeCheckpointHook.class);
+ job2.setCheckpointHook(ConservativeCheckpointHook.class);
jobs.add(job1);
jobs.add(job2);