Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 0b8346b..fd6360a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -27,6 +27,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
@@ -45,6 +48,7 @@
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
@@ -458,13 +462,14 @@
private void abortJob(List<Exception> exceptions) {
Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<TaskCluster>(inProgressTaskClusters);
for (TaskCluster tc : inProgressTaskClustersCopy) {
- abortTaskCluster(findLastTaskClusterAttempt(tc));
+ abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
assert inProgressTaskClusters.isEmpty();
ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exceptions));
}
- private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+ private void abortTaskCluster(TaskClusterAttempt tcAttempt,
+ TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
@@ -477,11 +482,13 @@
ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null);
ta.setEndTime(System.currentTimeMillis());
List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId());
- if (abortTaskAttempts == null) {
+ if (status == TaskAttempt.TaskStatus.RUNNING && abortTaskAttempts == null) {
abortTaskAttempts = new ArrayList<TaskAttemptId>();
abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts);
}
- abortTaskAttempts.add(taId);
+ if (status == TaskAttempt.TaskStatus.RUNNING) {
+ abortTaskAttempts.add(taId);
+ }
}
}
final JobId jobId = jobRun.getJobId();
@@ -505,6 +512,9 @@
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+
+ tcAttempt.setStatus(failedOrAbortedStatus);
+ tcAttempt.setEndTime(System.currentTimeMillis());
}
private void abortDoomedTaskClusters() throws HyracksException {
@@ -519,9 +529,7 @@
for (TaskCluster tc : doomedTaskClusters) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
if (tca != null) {
- abortTaskCluster(tca);
- tca.setEndTime(System.currentTimeMillis());
- tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ abortTaskCluster(tca, TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
}
}
@@ -608,9 +616,7 @@
if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
- abortTaskCluster(lastAttempt);
- lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
- lastAttempt.setEndTime(System.currentTimeMillis());
+ abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
abortDoomedTaskClusters();
if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
abortJob(exceptions);
@@ -635,32 +641,41 @@
public void notifyNodeFailures(Set<String> deadNodes) {
try {
jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+ jobRun.getParticipatingNodeIds().removeAll(deadNodes);
+ jobRun.getCleanupPendingNodeIds().removeAll(deadNodes);
+ if (jobRun.getPendingStatus() != null && jobRun.getCleanupPendingNodeIds().isEmpty()) {
+ finishJob(jobRun);
+ return;
+ }
for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
- TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
- if (taskClusters != null) {
- for (TaskCluster tc : taskClusters) {
- TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
- if (lastTaskClusterAttempt != null
- && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
- .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
- boolean abort = false;
- for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
- assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
- if (deadNodes.contains(ta.getNodeId())) {
- ta.setStatus(
- TaskAttempt.TaskStatus.FAILED,
- Collections.singletonList(new Exception("Node " + ta.getNodeId()
- + " failed")));
- ta.setEndTime(System.currentTimeMillis());
- abort = true;
+ if (isPlanned(ac)) {
+ TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
+ if (taskClusters != null) {
+ for (TaskCluster tc : taskClusters) {
+ TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+ if (lastTaskClusterAttempt != null
+ && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+ .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+ boolean abort = false;
+ for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
+ assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
+ if (deadNodes.contains(ta.getNodeId())) {
+ ta.setStatus(
+ TaskAttempt.TaskStatus.FAILED,
+ Collections.singletonList(new Exception("Node " + ta.getNodeId()
+ + " failed")));
+ ta.setEndTime(System.currentTimeMillis());
+ abort = true;
+ }
+ }
+ if (abort) {
+ abortTaskCluster(lastTaskClusterAttempt,
+ TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
}
- if (abort) {
- abortTaskCluster(lastTaskClusterAttempt);
- }
}
+ abortDoomedTaskClusters();
}
- abortDoomedTaskClusters();
}
}
startRunnableActivityClusters();
@@ -668,4 +683,37 @@
abortJob(Collections.singletonList(e));
}
}
+
+ private void finishJob(final JobRun run) {
+ JobId jobId = run.getJobId();
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ if (appCtx != null) {
+ try {
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
+ }
+ }
+ run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+ ccs.getActiveRunMap().remove(jobId);
+ ccs.getRunMapArchive().put(jobId, run);
+ ccs.getRunHistory().put(jobId, run.getExceptions());
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private JSONObject createJobLogObject(final JobRun run) {
+ JSONObject jobLogObject = new JSONObject();
+ try {
+ ActivityClusterGraph acg = run.getActivityClusterGraph();
+ jobLogObject.put("activity-cluster-graph", acg.toJSON());
+ jobLogObject.put("job-run", run.toJSON());
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ return jobLogObject;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 6e8ddf0..22c3348 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;
@@ -48,11 +49,16 @@
@Override
public void run() {
+ LOGGER.warning("Cleanup for JobRun with id: " + jobId);
final JobRun run = ccs.getActiveRunMap().get(jobId);
if (run == null) {
LOGGER.warning("Unable to find JobRun with id: " + jobId);
return;
}
+ if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
+ finishJob(run);
+ return;
+ }
if (run.getPendingStatus() != null) {
LOGGER.warning("Ignoring duplicate cleanup for JobRun with id: " + jobId);
return;
@@ -63,33 +69,47 @@
run.setPendingStatus(status, exceptions);
}
if (targetNodes != null && !targetNodes.isEmpty()) {
+ Set<String> toDelete = new HashSet<String>();
for (String n : targetNodes) {
NodeControllerState ncs = ccs.getNodeMap().get(n);
try {
- ncs.getNodeController().cleanUpJoblet(jobId, status);
+ if (ncs == null) {
+ toDelete.add(n);
+ } else {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ }
} catch (Exception e) {
e.printStackTrace();
}
}
+ targetNodes.removeAll(toDelete);
+ run.getCleanupPendingNodeIds().removeAll(toDelete);
+ if (run.getCleanupPendingNodeIds().isEmpty()) {
+ finishJob(run);
+ }
} else {
- CCApplicationContext appCtx = ccs.getApplicationContext();
- if (appCtx != null) {
- try {
- appCtx.notifyJobFinish(jobId);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
- ccs.getActiveRunMap().remove(jobId);
- ccs.getRunMapArchive().put(jobId, run);
- ccs.getRunHistory().put(jobId, run.getExceptions());
+ finishJob(run);
+ }
+ }
+
+ private void finishJob(final JobRun run) {
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ if (appCtx != null) {
try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
}
}
+ run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
+ ccs.getActiveRunMap().remove(jobId);
+ ccs.getRunMapArchive().put(jobId, run);
+ ccs.getRunHistory().put(jobId, run.getExceptions());
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private JSONObject createJobLogObject(final JobRun run) {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d68ad2c..be419c2 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -549,4 +549,38 @@
public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
}
+
+ /**
+ * Get the path for vertex checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for vertex checkpointing
+ */
+ public static String getVertexCheckpointPath(Configuration conf, long lastSuperStep) {
+ return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/vertex/" + lastSuperStep;
+ }
+
+ /**
+ * Get the path for message checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for message checkpointing
+ */
+ public static String getMessageCheckpointPath(Configuration conf, long lastSuperStep) {
+ String path = "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/message/" + lastSuperStep;
+ return path;
+ }
+
+ /**
+ * Get the path for message checkpointing
+ *
+ * @param conf
+ * @param lastSuperStep
+ * @return the path for message checkpointing
+ */
+ public static String getSecondaryIndexCheckpointPath(Configuration conf, long lastSuperStep) {
+ return "/tmp/ckpoint/" + BspUtils.getJobId(conf) + "/secondaryindex/" + lastSuperStep;
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7bd2cf8..61c3653 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -50,6 +50,7 @@
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@SuppressWarnings("rawtypes")
@@ -99,7 +100,8 @@
IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
- int maxRetryCount = 1;
+ int maxRetryCount = 3;
+ jobGen = selectJobGen(planChoice, currentJob);
do {
try {
@@ -112,14 +114,14 @@
ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
/** load the data */
- if (i == 0 || compatible(lastJob, currentJob)) {
+ if ((i == 0 || compatible(lastJob, currentJob)) && !failed) {
if (i != 0) {
finishJobs(jobGen, deploymentId);
/** invalidate/clear checkpoint */
lastSnapshotJobIndex.set(0);
lastSnapshotSuperstep.set(0);
}
- jobGen = selectJobGen(planChoice, currentJob);
+ jobGen.reset(currentJob);
loadData(currentJob, jobGen, deploymentId);
} else {
jobGen.reset(currentJob);
@@ -137,12 +139,16 @@
/** clear checkpoints if any */
jobGen.clearCheckpoints();
hcc.unDeployBinary(deploymentId);
- } catch (IOException ioe) {
+ } catch (Exception e1) {
/** disk failures */
//restart from snapshot
- failed = true;
- retryCount++;
- throw new HyracksException(ioe);
+ /** node failures */
+ if (ExceptionUtilities.recoverable(e1)) {
+ failed = true;
+ retryCount++;
+ } else {
+ throw e1;
+ }
}
} while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
@@ -256,8 +262,13 @@
throws Exception {
if (doRecovery) {
/** reload the checkpoint */
- runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
-
+ if (snapshotSuperstep.get() > 0) {
+ runClearState(deploymentId, jobGen);
+ runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
+ } else {
+ runClearState(deploymentId, jobGen);
+ loadData(job, jobGen, deploymentId);
+ }
}
int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
boolean terminate = false;
@@ -272,8 +283,8 @@
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
if (ckpHook.checkpoint(i)) {
runCheckpoint(deploymentId, jobGen, i);
- snapshotSuperstep.set(i);
snapshotJobIndex.set(currentJobIndex);
+ snapshotSuperstep.set(i);
}
i++;
} while (!terminate);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 36723a6..91d7e22 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -137,8 +137,6 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- private String vertexCheckpointPath;
-
public JobGen(PregelixJob job) {
init(job);
}
@@ -148,10 +146,7 @@
pregelixJob = job;
initJobConfiguration();
job.setJobId(jobId);
-
- vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
- // set the frame size to be the one user specified if the user did
- // specify.
+ // set the frame size to be the one user specified if the user did specify.
int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
if (specifiedFrameSize > 0) {
frameSize = specifiedFrameSize;
@@ -376,15 +371,20 @@
@Override
public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
try {
-
PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
- FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
+ FileOutputFormat.setOutputPath(tmpJob,
+ new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)));
tmpJob.setOutputKeyClass(NullWritable.class);
tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+ FileSystem dfs = FileSystem.get(tmpJob.getConfiguration());
+ dfs.delete(new Path(BspUtils.getVertexCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+
+ dfs.delete(new Path(BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration)), true);
JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+
specs[0] = vertexCkpSpec;
for (int i = 1; i < specs.length; i++) {
specs[i] = stateCkpSpecs[i - 1];
@@ -397,7 +397,7 @@
@Override
public JobSpecification generateLoadingJob() throws HyracksException {
- JobSpecification spec = loadHDFSData(conf);
+ JobSpecification spec = loadHDFSData(pregelixJob);
return spec;
}
@@ -412,13 +412,22 @@
try {
PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
- FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
- JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+ FileInputFormat.setInputPaths(tmpJob,
+ new Path(BspUtils.getVertexCheckpointPath(conf, lastCheckpointedIteration)));
+ JobSpecification[] cleanVertices = generateCleanup();
+ JobSpecification createIndex = generateCreatingJob();
+ JobSpecification vertexLoadSpec = loadHDFSData(tmpJob);
JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
- JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
- specs[0] = vertexLoadSpec;
- for (int i = 1; i < specs.length; i++) {
- specs[i] = stateLoadSpecs[i - 1];
+ JobSpecification[] specs = new JobSpecification[cleanVertices.length + 2 + stateLoadSpecs.length];
+
+ int i = 0;
+ for (; i < cleanVertices.length; i++) {
+ specs[i] = cleanVertices[i];
+ }
+ specs[i++] = createIndex;
+ specs[i++] = vertexLoadSpec;
+ for (; i < specs.length; i++) {
+ specs[i] = stateLoadSpecs[i - cleanVertices.length - 2];
}
return specs;
} catch (Exception e) {
@@ -479,7 +488,8 @@
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+ private JobSpecification loadHDFSData(PregelixJob job) throws HyracksException, HyracksDataException {
+ Configuration conf = job.getConfiguration();
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
JobSpecification spec = new JobSpecification();
@@ -492,7 +502,7 @@
VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
List<InputSplit> splits = new ArrayList<InputSplit>();
try {
- splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+ splits = inputFormat.getSplits(job, fileSplitProvider.getFileSplits().length);
LOGGER.info("number of splits: " + splits.size());
for (InputSplit split : splits)
LOGGER.info(split.toString());
@@ -637,10 +647,11 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ false);
ClusterConfig.setLocationConstraint(spec, materializeRead);
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
+ String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -660,7 +671,7 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
throws HyracksException {
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
+ String checkpointPath = BspUtils.getMessageCheckpointPath(job.getConfiguration(), lastCheckpointedIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
try {
@@ -668,7 +679,7 @@
} catch (IOException e) {
throw new HyracksException(e);
}
- Configuration conf = job.getConfiguration();
+ Configuration conf = tmpJob.getConfiguration();
Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
JobSpecification spec = new JobSpecification();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 41887c0..7c1df0ff 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -344,7 +344,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
@@ -529,7 +530,7 @@
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
/** generate secondary index checkpoint */
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
+ String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
tmpJob.setOutputValueClass(MsgList.class);
@@ -569,7 +570,7 @@
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
JobSpecification spec = new JobSpecification();
- String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
+ String checkpointPath = BspUtils.getSecondaryIndexCheckpointPath(conf, lastSuccessfulIteration);
PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
try {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index c29ea18..287b797 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -299,7 +299,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index dc61971..3b3c9e7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -288,7 +288,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 34f723f..e334095 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -302,7 +302,8 @@
/**
* construct the materializing write operator
*/
- MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
+ true);
ClusterConfig.setLocationConstraint(spec, materializeRead);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
new file mode 100644
index 0000000..4b1bf94
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/ExceptionUtilities.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+/**
+ * The util to analysis exceptions
+ *
+ * @author yingyib
+ */
+public class ExceptionUtilities {
+
+ /**
+ * Check whether a exception is recoverable or not
+ *
+ * @param exception
+ * @return true or false
+ */
+ public static boolean recoverable(Exception exception) {
+ String message = exception.getMessage();
+ if (exception instanceof InterruptedException || (message.contains("Node") && message.contains("not live"))
+ || message.contains("Failure occurred on input")) {
+ return true;
+ }
+ Throwable cause = exception;
+ while ((cause = cause.getCause()) != null) {
+ if (cause instanceof InterruptedException) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e1795de..f4b26cb 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,8 +61,8 @@
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
- //ccConfig.heartbeatPeriod = 5000;
- //ccConfig.maxHeartbeatLapsePeriods = 1;
+ ccConfig.heartbeatPeriod = 5000;
+ ccConfig.maxHeartbeatLapsePeriods = 8;
// cluster controller
cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 7221cb5..b22e468 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -230,8 +230,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index b21cd2a..0ecfd03 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -254,8 +254,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
/** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index dd6ee3c..e64e9cc 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -245,6 +245,13 @@
@Override
public void fail() throws HyracksDataException {
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
writer.fail();
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 3cebfb8..a9c787f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -219,8 +219,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
/** compare tuples */
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index bbe2764..86a211f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -205,6 +205,13 @@
@Override
public void fail() throws HyracksDataException {
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
writer.fail();
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index c4890e1..1f410c7 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -100,6 +100,12 @@
@Override
public void fail() throws HyracksDataException {
-
+ try {
+ bulkLoader.end();
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexOpHelper.close();
+ }
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
index bd85e3e..de87909 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeSearchFunctionUpdateOperatorNodePushable.java
@@ -224,8 +224,16 @@
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers)
+ try {
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ treeIndexHelper.close();
+ }
+ for (IFrameWriter writer : writers) {
writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 9a21680..d3a9890 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -161,7 +161,7 @@
FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
@Override
public boolean accept(Path dir) {
- return dir.getName().endsWith(TEMP_DIR);
+ return dir.getName().endsWith(TEMP_DIR) && dir.getName().indexOf(".crc") < 0;
}
});
Path tempDir = tempPaths[0].getPath();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index ca8f190..b44b643 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -30,9 +30,12 @@
public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
+ private final boolean removeIterationState;
- public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+ public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
+ boolean removeIterationState) {
super(spec, 1, 1);
+ this.removeIterationState = removeIterationState;
recordDescriptors[0] = recordDescriptor;
}
@@ -73,7 +76,7 @@
@Override
public void fail() throws HyracksDataException {
-
+ writer.fail();
}
@Override
@@ -81,7 +84,9 @@
/**
* remove last iteration's state
*/
- IterationUtils.removeIterationState(ctx, partition);
+ if (removeIterationState) {
+ IterationUtils.removeIterationState(ctx, partition);
+ }
writer.close();
complete = true;
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
index dac087f..5294ace 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -37,7 +37,7 @@
public class FailureRecoveryTest {
private static String INPUTPATH = "data/webmap";
private static String OUTPUTPAH = "actual/result";
- private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+ private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal2";
@Test
public void test() throws Exception {
@@ -57,29 +57,28 @@
testCluster.setUp();
Driver driver = new Driver(PageRankVertex.class);
- // Thread thread = new Thread(new Runnable() {
- //
- // @Override
- // public void run() {
- // try {
- // synchronized (this) {
- // this.wait(10000);
- // PregelixHyracksIntegrationUtil.showDownNC1();
- // }
- // } catch (Exception e) {
- // throw new IllegalStateException(e);
- // }
- // }
- //
- // });
- //thread.start();
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ this.wait(15000);
+ PregelixHyracksIntegrationUtil.showDownNC1();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ thread.start();
driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
} catch (Exception e) {
+ PregelixHyracksIntegrationUtil.showDownNC2();
+ testCluster.cleanupHDFS();
throw e;
- } finally {
- testCluster.tearDown();
}
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index 5a485ba..dc7a28d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -24,6 +24,7 @@
import org.junit.Test;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
@@ -55,7 +56,7 @@
job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job1, INPUTPATH);
job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- //job1.setCheckpointHook(ConservativeCheckpointHook.class);
+ job1.setCheckpointHook(ConservativeCheckpointHook.class);
PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
job2.setVertexClass(PageRankVertex.class);
@@ -65,7 +66,7 @@
job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- //job2.setCheckpointHook(ConservativeCheckpointHook.class);
+ job2.setCheckpointHook(ConservativeCheckpointHook.class);
jobs.add(job1);
jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index 40ea690..660d9eb 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -126,7 +126,7 @@
/**
* cleanup hdfs cluster
*/
- private void cleanupHDFS() throws Exception {
+ public void cleanupHDFS() throws Exception {
dfsCluster.shutdown();
}