add vertex checkpointing support
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
index 503b521..358433b 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -45,28 +46,31 @@
}
@Override
- public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ public VertexReader<I, V, E, M> createVertexReader(final InputSplit split, final TaskAttemptContext context)
+ throws IOException {
return new VertexReader<I, V, E, M>() {
+ RecordReader recordReader = sequenceInputFormat.createRecordReader(split, context);
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
InterruptedException {
-
+ recordReader.initialize(inputSplit, context);
}
@Override
public boolean nextVertex() throws IOException, InterruptedException {
- return false;
+ return recordReader.nextKeyValue();
}
+ @SuppressWarnings("unchecked")
@Override
public Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
- return null;
+ return (Vertex<I, V, E, M>) recordReader.getCurrentValue();
}
@Override
public void close() throws IOException {
-
+ recordReader.close();
}
@Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
index 0581658..8c31a68 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
@@ -16,11 +16,14 @@
import java.io.IOException;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
@@ -32,11 +35,14 @@
@SuppressWarnings("rawtypes")
public class InternalVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends
VertexOutputFormat<I, V, E> {
+ protected SequenceFileOutputFormat<NullWritable, Vertex> sequenceOutputFormat = new SequenceFileOutputFormat<NullWritable, Vertex>();
@Override
- public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+ public VertexWriter<I, V, E> createVertexWriter(final TaskAttemptContext context) throws IOException,
InterruptedException {
return new VertexWriter<I, V, E>() {
+ private RecordWriter<NullWritable, Vertex> recordWriter = sequenceOutputFormat.getRecordWriter(context);
+ private NullWritable key = NullWritable.get();
@Override
public void initialize(TaskAttemptContext context) throws IOException, InterruptedException {
@@ -45,12 +51,12 @@
@Override
public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
-
+ recordWriter.write(key, vertex);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-
+ recordWriter.close(context);
}
};
@@ -68,19 +74,19 @@
@Override
public void abortTask(TaskAttemptContext arg0) throws IOException {
// TODO Auto-generated method stub
-
+
}
@Override
public void cleanupJob(JobContext arg0) throws IOException {
// TODO Auto-generated method stub
-
+
}
@Override
public void commitTask(TaskAttemptContext arg0) throws IOException {
// TODO Auto-generated method stub
-
+
}
@Override
@@ -90,12 +96,12 @@
@Override
public void setupJob(JobContext arg0) throws IOException {
-
+
}
@Override
public void setupTask(TaskAttemptContext arg0) throws IOException {
-
+
}
};
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java
new file mode 100644
index 0000000..9d6eb5a
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/ICheckpointHook.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.job;
+
+/**
+ * @author yingyib
+ */
+public interface ICheckpointHook {
+
+ public boolean checkpoint(int superstep);
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index d2d5cf0..1e0d87a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -74,6 +74,18 @@
public static final String FRAME_SIZE = "pregelix.framesize";
/** update intensive */
public static final String UPDATE_INTENSIVE = "pregelix.updateIntensive";
+ /** the check point hook */
+ public static final String CKP_CLASS = "pregelix.checkpointHook";
+
+ /**
+ * Construct a Pregelix job from an existing configuration
+ *
+ * @param conf
+ * @throws IOException
+ */
+ public PregelixJob(Configuration conf) throws IOException {
+ super(conf);
+ }
/**
* Constructor that will instantiate the configuration
@@ -202,6 +214,15 @@
getConfiguration().setBoolean(UPDATE_INTENSIVE, variableSizedUpdateHeavyFlag);
}
+ /**
+ * Users can provide an ICheckpointHook implementation to specify when to do checkpoint
+ *
+ * @param ckpClass
+ */
+ final public void setCheckpointHook(Class<?> ckpClass) {
+ getConfiguration().setClass(CKP_CLASS, ckpClass, ICheckpointHook.class);
+ }
+
@Override
public String toString() {
return getJobName();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index ff4ee91..51a9ce3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -29,6 +29,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
/**
@@ -461,6 +462,24 @@
}
/**
+ * Create a checkpoint hook
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ public static ICheckpointHook createCheckpointHook(Configuration conf) {
+ Class<? extends ICheckpointHook> ckpClass = getCheckpointHookClass(conf);
+ try {
+ return ckpClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+ }
+ }
+
+ /**
* Get the user's subclassed vertex partitioner class.
*
* @param conf
@@ -475,6 +494,20 @@
}
/**
+ * Get the user's subclassed checkpoint hook class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return The user defined vertex checkpoint hook class
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends ICheckpointHook> Class<V> getCheckpointHookClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<V>) conf.getClass(PregelixJob.CKP_CLASS, DefaultCheckpointHook.class, ICheckpointHook.class);
+ }
+
+ /**
* Get the job configuration parameter whether the vertex states will increase dynamically
*
* @param conf
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
new file mode 100644
index 0000000..1f21c2f
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+
+/**
+ * @author yingyib
+ */
+public class DefaultCheckpointHook implements ICheckpointHook {
+
+ @Override
+ public boolean checkpoint(int superstep) {
+ return false;
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
index f9cad9f..c7febc1 100755
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -15,11 +15,8 @@
package edu.uci.ics.pregelix.api.util;
import java.io.InputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
public class ResetableByteArrayInputStream extends InputStream {
- private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
private byte[] data;
private int position;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
index 2d58902..156c78c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
@@ -26,4 +26,10 @@
public JobSpecification generateJob(int iteration) throws HyracksException;
+ public JobSpecification generateCheckpointing() throws HyracksException;
+
+ public JobSpecification generateLoadingCheckpoint() throws HyracksException;
+
+ public JobSpecification generateClearState() throws HyracksException;
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 1b4f226..6433f7b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -29,6 +29,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -39,6 +40,7 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
@@ -93,25 +95,29 @@
DeploymentId deploymentId = prepareJobs(ipAddress, port);
LOG.info("job started");
- int lastSnapshotJobIndex = 0;
- int lastSnapshotSuperstep = 0;
+ IntWritable lastSnapshotJobIndex = new IntWritable(0);
+ IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
int maxRetryCount = 3;
do {
try {
- for (int i = lastSnapshotJobIndex; i < jobs.size(); i++) {
+ for (int i = lastSnapshotJobIndex.get(); i < jobs.size(); i++) {
lastJob = currentJob;
currentJob = jobs.get(i);
/** add hadoop configurations */
addHadoopConfiguration(currentJob, ipAddress, port);
+ ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
/** load the data */
if (i == 0 || compatible(lastJob, currentJob)) {
if (i != 0) {
finishJobs(jobGen, deploymentId);
+ /** invalidate/clear checkpoint */
+ lastSnapshotJobIndex.set(0);
+ lastSnapshotSuperstep.set(0);
}
jobGen = selectJobGen(planChoice, currentJob);
loadData(currentJob, jobGen, deploymentId);
@@ -120,12 +126,15 @@
}
/** run loop-body jobs */
- runLoopBody(deploymentId, currentJob, jobGen, lastSnapshotSuperstep);
+ runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
+ ckpHook);
runClearState(deploymentId, jobGen);
}
/** finish the jobs */
finishJobs(jobGen, deploymentId);
+ /** clear checkpoints if any */
+ jobGen.clearCheckpoints();
hcc.unDeployBinary(deploymentId);
} catch (IOException ioe) {
/** disk failures */
@@ -241,12 +250,13 @@
ClusterConfig.loadClusterConfig(ipAddress, port);
}
- private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int snapshotSuperstep)
- throws Exception {
- if (snapshotSuperstep > 0) {
- /** reload the snapshot */
+ private void runLoopBody(DeploymentId deploymentId, PregelixJob job, JobGen jobGen, int currentJobIndex,
+ IntWritable snapshotJobIndex, IntWritable snapshotSuperstep, ICheckpointHook ckpHook) throws Exception {
+ if (snapshotJobIndex.get() >= 0 && snapshotSuperstep.get() > 0) {
+ /** reload the checkpoint */
+ runLoadCheckpoint(deploymentId, jobGen);
}
- int i = snapshotSuperstep + 1;
+ int i = snapshotSuperstep.get() + 1;
boolean terminate = false;
long start, end, time;
do {
@@ -257,10 +267,33 @@
LOG.info(job + ": iteration " + i + " finished " + time + "ms");
terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
+ if (ckpHook.checkpoint(i)) {
+ runCheckpoint(deploymentId, jobGen);
+ snapshotSuperstep.set(i);
+ snapshotJobIndex.set(currentJobIndex);
+ }
i++;
} while (!terminate);
}
+ private void runCheckpoint(DeploymentId deploymentId, JobGen jobGen) throws Exception {
+ try {
+ JobSpecification ckpJob = jobGen.generateCheckpointing();
+ execute(deploymentId, ckpJob);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void runLoadCheckpoint(DeploymentId deploymentId, JobGen jobGen) throws Exception {
+ try {
+ JobSpecification ckpJob = jobGen.generateLoadingCheckpoint();
+ execute(deploymentId, ckpJob);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 931ecc3..7cb00a6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -15,8 +15,14 @@
package edu.uci.ics.pregelix.core.jobgen;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
@@ -24,10 +30,15 @@
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -70,6 +81,8 @@
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.ReflectionUtils;
@@ -111,12 +124,19 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+ private String checkpointPath;
+
public JobGen(PregelixJob job) {
+ init(job);
+ }
+
+ private void init(PregelixJob job) {
conf = job.getConfiguration();
pregelixJob = job;
initJobConfiguration();
job.setJobId(jobId);
+ checkpointPath = "/tmp/ckpoint/" + jobId;
// set the frame size to be the one user specified if the user did
// specify.
int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
@@ -130,10 +150,7 @@
}
public void reset(PregelixJob job) {
- conf = job.getConfiguration();
- pregelixJob = job;
- initJobConfiguration();
- job.setJobId(jobId);
+ init(job);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -195,74 +212,6 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateLoadingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
-
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
- LOGGER.info("number of splits: " + splits.size());
- for (InputSplit split : splits)
- LOGGER.info(split.toString());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- readSchedule, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner);
-
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
- nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
-
- /**
- * construct tree bulk load operator
- */
- int[] fieldPermutation = new int[2];
- fieldPermutation[0] = 0;
- fieldPermutation[1] = 1;
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
- getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
-
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
- spec.setFrameSize(frameSize);
- return spec;
- }
-
@Override
public JobSpecification generateJob(int iteration) throws HyracksException {
if (iteration <= 0)
@@ -406,8 +355,171 @@
return spec;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ JobSpecification spec = scanIndexWriteToHDFS(conf);
+ return spec;
+ }
+
+ @Override
+ public JobSpecification generateCheckpointing() throws HyracksException {
+ try {
+ PregelixJob tmpJob = this.createCloneJob("checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+ tmpJob.setOutputKeyClass(NullWritable.class);
+ tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
+ JobSpecification spec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+ return spec;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ JobSpecification spec = loadHDFSData(conf);
+ return spec;
+ }
+
+ public void clearCheckpoints() throws IOException {
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(new Path(checkpointPath), true);
+ }
+
+ @Override
+ public JobSpecification generateLoadingCheckpoint() throws HyracksException {
+ try {
+ PregelixJob tmpJob = this.createCloneJob("checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ FileInputFormat.setInputPaths(tmpJob, new Path(checkpointPath));
+ JobSpecification spec = loadHDFSData(tmpJob.getConfiguration());
+ return spec;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /***
+ * generate a "clear state" job
+ */
+ public JobSpecification generateClearState() throws HyracksException {
+ JobSpecification spec = new JobSpecification();
+ ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
+ ClusterConfig.setLocationConstraint(spec, clearState);
+ spec.addRoot(clearState);
+ return spec;
+ }
+
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName) throws HyracksException {
+ JobSpecification spec = new JobSpecification();
+
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+ lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
+
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
+ if (partitionerClazz != null) {
+ return new VertexPartitionComputerFactory(confFactory);
+ } else {
+ return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
+ BspUtils.getVertexIndexClass(conf)));
+ }
+ }
+
+ protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
+ if (BspUtils.useLSM(conf)) {
+ return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
+ 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+ NoOpIOOperationCallback.INSTANCE, 0.01);
+ } else {
+ return new BTreeDataflowHelperFactory();
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private JobSpecification loadHDFSData(Configuration conf) throws HyracksException, HyracksDataException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(pregelixJob, fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
+
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, 0, false,
+ getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private JobSpecification scanIndexWriteToHDFS(Configuration conf) throws HyracksDataException, HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
JobSpecification spec = new JobSpecification();
@@ -464,56 +576,14 @@
return spec;
}
- /***
- * generate a "clear state" job
- */
- public JobSpecification generateClearState() throws HyracksException {
- JobSpecification spec = new JobSpecification();
- ClearStateOperatorDescriptor clearState = new ClearStateOperatorDescriptor(spec, jobId);
- ClusterConfig.setLocationConstraint(spec, clearState);
- spec.addRoot(clearState);
- return spec;
- }
-
- /***
- * drop the sindex
- *
- * @return JobSpecification
- * @throws HyracksException
- */
- protected JobSpecification dropIndex(String indexName) throws HyracksException {
- JobSpecification spec = new JobSpecification();
-
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
- lcManagerProvider, fileSplitProvider, getIndexDataflowHelperFactory());
-
- ClusterConfig.setLocationConstraint(spec, drop);
- spec.addRoot(drop);
- spec.setFrameSize(frameSize);
- return spec;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected ITuplePartitionComputerFactory getVertexPartitionComputerFactory() {
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- Class<? extends VertexPartitioner> partitionerClazz = BspUtils.getVertexPartitionerClass(conf);
- if (partitionerClazz != null) {
- return new VertexPartitionComputerFactory(confFactory);
- } else {
- return new VertexIdPartitionComputerFactory(new WritableSerializerDeserializerFactory(
- BspUtils.getVertexIndexClass(conf)));
- }
- }
-
- protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
- if (BspUtils.useLSM(conf)) {
- return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
- 3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
- NoOpIOOperationCallback.INSTANCE, 0.01);
- } else {
- return new BTreeDataflowHelperFactory();
- }
+ private PregelixJob createCloneJob(String newJobName, PregelixJob oldJob) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ oldJob.getConfiguration().write(dos);
+ PregelixJob newJob = new PregelixJob(newJobName);
+ DataInput dis = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ newJob.getConfiguration().readFields(dis);
+ return newJob;
}
/** generate non-first iteration job */
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 75f8ed8..5844d09 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -52,6 +52,11 @@
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
Map<StateKey, IStateObject> map = context.getAppStateStore();
IStateObject state = map.get(new StateKey(lastId, partition));
+ if (state == null) {
+ /** in case the last job is a checkpointing job */
+ lastId = new JobId(currentId.getId() - 2);
+ state = map.get(new StateKey(lastId, partition));
+ }
return state;
}