implemented checkpoint read
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
index 1bf4abe..833daf4 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
@@ -166,7 +166,7 @@
}
@Override
- public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException {
+ public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException {
try {
tb.reset();
if (parser != null) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
index 1852a6f..57c20e0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -38,18 +38,13 @@
public void open(IFrameWriter writer) throws HyracksDataException;
/**
- * Parse a key-value pair returned by HDFS record reader to a tuple.
- * when the parsers' internal buffer is full, it can flush the buffer to the writer
- *
* @param key
- * The key returned from Hadoop's InputReader.
* @param value
- * The value returned from Hadoop's InputReader.
* @param writer
- * The hyracks writer for outputting data.
+ * @param fileName
* @throws HyracksDataException
*/
- public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
+ public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException;
/**
* Flush the residual tuples in the internal buffer to the writer.
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
index 674873d..7e6e4dc 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -30,6 +30,6 @@
* the IHyracksTaskContext
* @return a tuple writer instance
*/
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 2cff534..a45992c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -128,7 +128,7 @@
Object key = reader.createKey();
Object value = reader.createValue();
while (reader.next(key, value) == true) {
- parser.parse(key, value, writer);
+ parser.parse(key, value, writer, inputSplits[i].toString());
}
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 432849b..4e48e9b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -89,7 +89,7 @@
String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
String fileName = outputDirPath + File.separator + "part-" + partition;
- tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
try {
FileSystem dfs = FileSystem.get(conf);
dos = dfs.create(new Path(fileName), true);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index fbac95b..92cde9d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -49,7 +49,8 @@
}
@Override
- public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+ public void parse(LongWritable key, Text value, IFrameWriter writer, String fileString)
+ throws HyracksDataException {
tb.reset();
tb.addField(value.getBytes(), 0, value.getLength());
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
index 92a427e..60be1f7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -27,7 +27,7 @@
private static final long serialVersionUID = 1L;
@Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, int partition, int nPartition) {
return new ITupleWriter() {
private byte newLine = "\n".getBytes()[0];
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 3f01d77..43ca4ac 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -144,7 +144,8 @@
RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
while (reader.nextKeyValue() == true) {
- parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer);
+ parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer,
+ inputSplits.get(i).toString());
}
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 77b8c7e..068cdfc 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -88,7 +88,7 @@
String outputPath = FileOutputFormat.getOutputPath(conf).toString();
String fileName = outputPath + File.separator + "part-" + partition;
- tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx, partition, nPartitions);
try {
FileSystem dfs = FileSystem.get(conf.getConfiguration());
dos = dfs.create(new Path(fileName), true);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
index 358433b..22d3b27 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexInputFormat.java
@@ -37,7 +37,7 @@
public class InternalVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable>
extends VertexInputFormat<I, V, E, M> {
/** Uses the SequenceFileInputFormat to do everything */
- protected SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
+ private SequenceFileInputFormat sequenceInputFormat = new SequenceFileInputFormat();
@SuppressWarnings("unchecked")
@Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
index 8c31a68..b603037 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/internal/InternalVertexOutputFormat.java
@@ -35,13 +35,13 @@
@SuppressWarnings("rawtypes")
public class InternalVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends
VertexOutputFormat<I, V, E> {
- protected SequenceFileOutputFormat<NullWritable, Vertex> sequenceOutputFormat = new SequenceFileOutputFormat<NullWritable, Vertex>();
+ private SequenceFileOutputFormat sequenceOutputFormat = new SequenceFileOutputFormat();
@Override
public VertexWriter<I, V, E> createVertexWriter(final TaskAttemptContext context) throws IOException,
InterruptedException {
return new VertexWriter<I, V, E>() {
- private RecordWriter<NullWritable, Vertex> recordWriter = sequenceOutputFormat.getRecordWriter(context);
+ private RecordWriter recordWriter = sequenceOutputFormat.getRecordWriter(context);
private NullWritable key = NullWritable.get();
@Override
@@ -49,6 +49,7 @@
}
+ @SuppressWarnings("unchecked")
@Override
public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
recordWriter.write(key, vertex);
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
new file mode 100644
index 0000000..6a4a660
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+
+/**
+ * A conservative checkpoint hook which does checkpoint every 5 supersteps
+ *
+ * @author yingyib
+ */
+public class ConservativeCheckpointHook implements ICheckpointHook {
+
+ @Override
+ public boolean checkpoint(int superstep) {
+ if (superstep % 5 == 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
index 1f21c2f..c37c4ab 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultCheckpointHook.java
@@ -17,6 +17,8 @@
import edu.uci.ics.pregelix.api.job.ICheckpointHook;
/**
+ * The default checkpoint hook which never does checkpointing.
+ *
* @author yingyib
*/
public class DefaultCheckpointHook implements ICheckpointHook {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
index 156c78c..6bb0dea 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
@@ -26,9 +26,9 @@
public JobSpecification generateJob(int iteration) throws HyracksException;
- public JobSpecification generateCheckpointing() throws HyracksException;
+ public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException;
- public JobSpecification generateLoadingCheckpoint() throws HyracksException;
+ public JobSpecification[] generateLoadingCheckpoint(int lastCheckpointedIteration) throws HyracksException;
public JobSpecification generateClearState() throws HyracksException;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 6433f7b..b4172d9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -99,7 +99,7 @@
IntWritable lastSnapshotSuperstep = new IntWritable(0);
boolean failed = false;
int retryCount = 0;
- int maxRetryCount = 3;
+ int maxRetryCount = 1;
do {
try {
@@ -254,7 +254,7 @@
IntWritable snapshotJobIndex, IntWritable snapshotSuperstep, ICheckpointHook ckpHook) throws Exception {
if (snapshotJobIndex.get() >= 0 && snapshotSuperstep.get() > 0) {
/** reload the checkpoint */
- runLoadCheckpoint(deploymentId, jobGen);
+ runLoadCheckpoint(deploymentId, jobGen, snapshotSuperstep.get());
}
int i = snapshotSuperstep.get() + 1;
boolean terminate = false;
@@ -268,7 +268,7 @@
terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
if (ckpHook.checkpoint(i)) {
- runCheckpoint(deploymentId, jobGen);
+ runCheckpoint(deploymentId, jobGen, i);
snapshotSuperstep.set(i);
snapshotJobIndex.set(currentJobIndex);
}
@@ -276,19 +276,20 @@
} while (!terminate);
}
- private void runCheckpoint(DeploymentId deploymentId, JobGen jobGen) throws Exception {
+ private void runCheckpoint(DeploymentId deploymentId, JobGen jobGen, int lastSuccessfulIteration) throws Exception {
try {
- JobSpecification ckpJob = jobGen.generateCheckpointing();
- execute(deploymentId, ckpJob);
+ JobSpecification[] ckpJobs = jobGen.generateCheckpointing(lastSuccessfulIteration);
+ runJobArray(deploymentId, ckpJobs);
} catch (Exception e) {
throw e;
}
}
- private void runLoadCheckpoint(DeploymentId deploymentId, JobGen jobGen) throws Exception {
+ private void runLoadCheckpoint(DeploymentId deploymentId, JobGen jobGen, int checkPointedIteration)
+ throws Exception {
try {
- JobSpecification ckpJob = jobGen.generateLoadingCheckpoint();
- execute(deploymentId, ckpJob);
+ JobSpecification[] ckpJobs = jobGen.generateLoadingCheckpoint(checkPointedIteration);
+ runJobArray(deploymentId, ckpJobs);
} catch (Exception e) {
throw e;
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 7cb00a6..8c36ea5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -61,6 +61,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -78,6 +82,7 @@
import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -93,7 +98,13 @@
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
+import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -124,7 +135,7 @@
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- private String checkpointPath;
+ private String vertexCheckpointPath;
public JobGen(PregelixJob job) {
init(job);
@@ -136,7 +147,7 @@
initJobConfiguration();
job.setJobId(jobId);
- checkpointPath = "/tmp/ckpoint/" + jobId;
+ vertexCheckpointPath = "/tmp/ckpoint/" + jobId + "/vertex";
// set the frame size to be the one user specified if the user did
// specify.
int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
@@ -347,10 +358,11 @@
*/
int[] sortFields = new int[1];
sortFields[0] = 0;
- ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+ //ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), scanner, 0, writer, 0);
+ //spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ // comparatorFactories), scanner, 0, writer, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
spec.setFrameSize(frameSize);
return spec;
}
@@ -361,15 +373,21 @@
}
@Override
- public JobSpecification generateCheckpointing() throws HyracksException {
+ public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
try {
- PregelixJob tmpJob = this.createCloneJob("checkpointing for job " + jobId, pregelixJob);
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
- FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+ FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath));
tmpJob.setOutputKeyClass(NullWritable.class);
tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
- JobSpecification spec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
- return spec;
+ JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
+ JobSpecification[] stateCkpSpecs = generateStateCheckpointing(lastSuccessfulIteration);
+ JobSpecification[] specs = new JobSpecification[1 + stateCkpSpecs.length];
+ specs[0] = vertexCkpSpec;
+ for (int i = 1; i < specs.length; i++) {
+ specs[i] = stateCkpSpecs[i - 1];
+ }
+ return specs;
} catch (Exception e) {
throw new HyracksException(e);
}
@@ -383,17 +401,24 @@
public void clearCheckpoints() throws IOException {
FileSystem dfs = FileSystem.get(conf);
- dfs.delete(new Path(checkpointPath), true);
+ // clear the checkpoint directory
+ dfs.delete(new Path("/tmp/ckpoint/" + jobId), true);
}
@Override
- public JobSpecification generateLoadingCheckpoint() throws HyracksException {
+ public JobSpecification[] generateLoadingCheckpoint(int lastCheckpointedIteration) throws HyracksException {
try {
- PregelixJob tmpJob = this.createCloneJob("checkpointing for job " + jobId, pregelixJob);
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
- FileInputFormat.setInputPaths(tmpJob, new Path(checkpointPath));
- JobSpecification spec = loadHDFSData(tmpJob.getConfiguration());
- return spec;
+ FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
+ JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
+ JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, pregelixJob);
+ JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
+ specs[0] = vertexLoadSpec;
+ for (int i = 1; i < specs.length; i++) {
+ specs[i] = stateLoadSpecs[i - 1];
+ }
+ return specs;
} catch (Exception e) {
throw new HyracksException(e);
}
@@ -576,14 +601,115 @@
return spec;
}
- private PregelixJob createCloneJob(String newJobName, PregelixJob oldJob) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutput dos = new DataOutputStream(bos);
- oldJob.getConfiguration().write(dos);
- PregelixJob newJob = new PregelixJob(newJobName);
- DataInput dis = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
- newJob.getConfiguration().readFields(dis);
- return newJob;
+ protected PregelixJob createCloneJob(String newJobName, PregelixJob oldJob) throws HyracksException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ oldJob.getConfiguration().write(dos);
+ PregelixJob newJob = new PregelixJob(newJobName);
+ DataInput dis = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ newJob.getConfiguration().readFields(dis);
+ return newJob;
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /** generate plan specific state checkpointing */
+ protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * source aggregate
+ */
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+ FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+ tmpJob.setOutputKeyClass(vertexIdClass);
+ tmpJob.setOutputValueClass(MsgList.class);
+
+ ITupleWriterFactory writerFactory = new KeyValueWriterFactory(new ConfFactory(tmpJob));
+ HDFSWriteOperatorDescriptor hdfsWriter = new HDFSWriteOperatorDescriptor(spec, tmpJob, writerFactory);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, materializeRead, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, hdfsWriter, 0);
+ spec.setFrameSize(frameSize);
+ return new JobSpecification[] { spec };
+ }
+
+ /** load plan specific state checkpoints */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
+ throws HyracksException {
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+ PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
+ tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ try {
+ FileInputFormat.setInputPaths(tmpJob, checkpointPath);
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ }
+ Configuration conf = job.getConfiguration();
+ Class vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /***
+ * HDFS read operator
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
+ readSchedule, new KeyValueParserFactory());
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
+ recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
+ materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, emptySink, 0);
+ spec.setFrameSize(frameSize);
+ return new JobSpecification[] { spec };
}
/** generate non-first iteration job */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index c144ddd..58384b2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -15,28 +15,51 @@
package edu.uci.ics.pregelix.core.jobgen;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.internal.InternalVertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
@@ -48,6 +71,8 @@
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.KeyValueParserFactory;
+import edu.uci.ics.pregelix.dataflow.KeyValueWriterFactory;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -69,6 +94,7 @@
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
public class JobGenInnerJoin extends JobGen {
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
public JobGenInnerJoin(PregelixJob job) {
super(job);
@@ -496,6 +522,171 @@
return spec;
}
+ /** generate plan specific state checkpointing */
+ protected JobSpecification[] generateStateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
+ JobSpecification[] msgCkpSpecs = super.generateStateCheckpointing(lastSuccessfulIteration);
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+
+ /** generate secondary index checkpoint */
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
+ tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
+ tmpJob.setOutputValueClass(MsgList.class);
+ JobSpecification secondaryBTreeCkp = generateSecondaryBTreeCheckpoint(lastSuccessfulIteration, tmpJob);
+
+ JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
+ for (int i = 0; i < msgCkpSpecs.length; i++) {
+ specs[i] = msgCkpSpecs[i];
+ }
+ specs[specs.length - 1] = secondaryBTreeCkp;
+ return specs;
+ }
+
+ /**
+ * generate plan specific checkpoint loading
+ */
+ @Override
+ protected JobSpecification[] generateStateCheckpointLoading(int lastSuccessfulIteration, PregelixJob job)
+ throws HyracksException {
+ JobSpecification[] msgCkpSpecs = generateStateCheckpointLoading(lastSuccessfulIteration, job);
+ PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
+ tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
+
+ /** generate secondary index checkpoint load */
+ JobSpecification secondaryBTreeCkpLoad = generateSecondaryBTreeCheckpointLoad(lastSuccessfulIteration, tmpJob);
+ JobSpecification[] specs = new JobSpecification[msgCkpSpecs.length + 1];
+ for (int i = 0; i < msgCkpSpecs.length; i++) {
+ specs[i] = msgCkpSpecs[i];
+ }
+ specs[specs.length - 1] = secondaryBTreeCkpLoad;
+ return specs;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private JobSpecification generateSecondaryBTreeCheckpointLoad(int lastSuccessfulIteration, PregelixJob job)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
+ JobSpecification spec = new JobSpecification();
+
+ String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+ PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
+ tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
+ try {
+ FileInputFormat.setInputPaths(tmpJob, checkpointPath);
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ }
+ Configuration conf = job.getConfiguration();
+
+ /***
+ * construct HDFS read operator
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(tmpJob, ClusterConfig.getLocationConstraint().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), MsgList.class.getName());
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ HDFSReadOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(spec, recordDescriptor, tmpJob, splits,
+ readSchedule, new KeyValueParserFactory());
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct bulk-load index operator
+ */
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ int fieldPermutation[] = new int[] { 0, 1 };
+ IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+ indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration + 1, WritableComparator
+ .get(vertexIdClass).getClass());
+ String writeFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+ IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+ TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
+ indexCmpFactories, fieldPermutation, new int[] { 0 }, DEFAULT_BTREE_FILL_FACTOR,
+ getIndexDataflowHelperFactory());
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
+ btreeBulkLoad, 0);
+ spec.setFrameSize(frameSize);
+
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private JobSpecification generateSecondaryBTreeCheckpoint(int lastSuccessfulIteration, PregelixJob job)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
+ Class<? extends Writable> msgListClass = MsgList.class;
+ String readFile = lastSuccessfulIteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+ IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+ JobSpecification spec = new JobSpecification();
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct btree search operator
+ */
+ ConfFactory confFactory = new ConfFactory(job);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), msgListClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, typeTraits,
+ comparatorFactories, null, null, null, true, true, getIndexDataflowHelperFactory(), false,
+ NoOpOperationCallbackFactory.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct write file operator
+ */
+ HDFSWriteOperatorDescriptor writer = new HDFSWriteOperatorDescriptor(spec, job, new KeyValueWriterFactory(
+ confFactory));
+ ClusterConfig.setLocationConstraint(spec, writer);
+
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
+
@Override
public JobSpecification[] generateCleanup() throws HyracksException {
JobSpecification[] cleanups = new JobSpecification[3];
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index db18e53..ea6cc8a 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -228,4 +228,16 @@
public static Scheduler getHdfsScheduler() {
return hdfsScheduler;
}
+
+ public static String[] getLocationConstraint() throws HyracksException {
+ int count = 0;
+ String[] locations = new String[NCs.length * stores.length];
+ for (String nc : NCs) {
+ for (int i = 0; i < stores.length; i++) {
+ locations[count] = nc;
+ count++;
+ }
+ }
+ return locations;
+ }
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java
new file mode 100644
index 0000000..a4a53e1
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueParserFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+/**
+ * @author yingyib
+ */
+public class KeyValueParserFactory<K extends Writable, V extends Writable> implements IKeyValueParserFactory<K, V> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException {
+ final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ final DataOutput dos = tb.getDataOutput();
+ final ByteBuffer buffer = ctx.allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(buffer, true);
+
+ return new IKeyValueParser<K, V>() {
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void parse(K key, V value, IFrameWriter writer, String fileString) throws HyracksDataException {
+ try {
+ tb.reset();
+ key.write(dos);
+ tb.addFieldEndOffset();
+ value.write(dos);
+ tb.addFieldEndOffset();
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(buffer, writer);
+ appender.reset(buffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new HyracksDataException("tuple cannot be appended into the frame");
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(buffer, writer);
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java
new file mode 100644
index 0000000..fd407be
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/KeyValueWriterFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.ReflectionUtils;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.ConfFactory;
+import edu.uci.ics.pregelix.api.util.ResetableByteArrayInputStream;
+
+/**
+ * @author yingyib
+ */
+@SuppressWarnings("rawtypes")
+public class KeyValueWriterFactory implements ITupleWriterFactory {
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+
+ public KeyValueWriterFactory(ConfFactory confFactory) {
+ this.confFactory = confFactory;
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx, final int partition, final int nPartition)
+ throws HyracksDataException {
+ return new ITupleWriter() {
+ private SequenceFileOutputFormat sequenceOutputFormat = new SequenceFileOutputFormat();
+ private Writable key;
+ private Writable value;
+ private ResetableByteArrayInputStream bis = new ResetableByteArrayInputStream();
+ private DataInput dis = new DataInputStream(bis);
+ private RecordWriter recordWriter;
+ private ContextFactory ctxFactory = new ContextFactory();
+ private TaskAttemptContext context;
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ try {
+ Job job = confFactory.getConf();
+ context = ctxFactory.createContext(job.getConfiguration(), partition);
+ recordWriter = sequenceOutputFormat.getRecordWriter(context);
+ Class<?> keyClass = context.getOutputKeyClass();
+ Class<?> valClass = context.getOutputValueClass();
+ key = (Writable) ReflectionUtils.createInstance(keyClass);
+ value = (Writable) ReflectionUtils.createInstance(valClass);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ byte[] data = tuple.getFieldData(0);
+ int fieldStart = tuple.getFieldStart(0);
+ bis.setByteArray(data, fieldStart);
+ key.readFields(dis);
+ data = tuple.getFieldData(1);
+ fieldStart = tuple.getFieldStart(1);
+ bis.setByteArray(data, fieldStart);
+ value.readFields(dis);
+ recordWriter.write(key, value);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ try {
+ recordWriter.close(context);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
index 48ed806..00dcbd1 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -68,6 +68,8 @@
@Override
public void open() throws HyracksDataException {
+ /** remove last iteration's state */
+ IterationUtils.removeIterationState(ctx, partition);
state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
partition));
INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 5844d09..603a464 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -52,9 +52,9 @@
RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
Map<StateKey, IStateObject> map = context.getAppStateStore();
IStateObject state = map.get(new StateKey(lastId, partition));
- if (state == null) {
+ while (state == null) {
/** in case the last job is a checkpointing job */
- lastId = new JobId(currentId.getId() - 2);
+ lastId = new JobId(lastId.getId() - 1);
state = map.get(new StateKey(lastId, partition));
}
return state;
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index cb7fd6d..f6857fe 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
@@ -76,6 +77,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -90,6 +92,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -116,6 +119,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.setCheckpointHook(ConservativeCheckpointHook.class);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
index aa0dfdd..5a2636a 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
@@ -37,8 +37,8 @@
*/
public class FailureVertexTest {
- private static String HDFS_INPUTPATH2 = "data/webmapcomplex";
- private static String HDFS_OUTPUTPAH2 = "actual/resultcomplex";
+ private static String INPUT_PATH = "data/webmapcomplex";
+ private static String OUTPUT_PATH = "actual/resultcomplex";
@Test
public void test() throws Exception {
@@ -52,8 +52,8 @@
job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
job.setDynamicVertexValueSize(true);
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
+ FileInputFormat.setInputPaths(job, INPUT_PATH);
+ FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
Driver driver = new Driver(FailureVertex.class);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
index 0d6f863..d2995f1 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
@@ -24,6 +24,7 @@
import org.junit.Test;
import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.example.PageRankVertex;
@@ -55,6 +56,7 @@
job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileInputFormat.setInputPaths(job1, INPUTPATH);
job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job1.setCheckpointHook(ConservativeCheckpointHook.class);
PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
job2.setVertexClass(PageRankVertex.class);
@@ -64,6 +66,7 @@
job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job2.setCheckpointHook(ConservativeCheckpointHook.class);
jobs.add(job1);
jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 9e1e0b0..b50b02a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -84,6 +84,7 @@
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index c4366d7..217fbba 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -84,6 +84,7 @@
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
index ac0d508..636b055 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealNoCombiner.xml
@@ -84,6 +84,7 @@
<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
<property><name>mapred.queue.names</name><value>default</value></property>
<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>pregelix.checkpointHook</name><value>edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook</value></property>
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>