fix the bug of naive and log algorithm
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 60342a7..108d130 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,6 +10,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
new file mode 100644
index 0000000..7898d35
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -0,0 +1,79 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+public class LogAlgorithmForPathMergeInputFormat extends
+ BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+ /**
+ * Format INPUT
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+ }
+
+ @SuppressWarnings("rawtypes")
+ class BinaryLoadGraphReader extends
+ BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ private Vertex vertex;
+ private KmerBytesWritable vertexId = null;
+ private ValueStateWritable vertexValue = new ValueStateWritable();
+
+ public BinaryLoadGraphReader(RecordReader<KmerBytesWritable,KmerCountValue> recordReader) {
+ super(recordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ if(getRecordReader() != null){
+ /**
+ * set the src vertex id
+ */
+ if(vertexId == null)
+ vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
+ vertexId.set(getRecordReader().getCurrentKey());
+ vertex.setVertexId(vertexId);
+ /**
+ * set the vertex value
+ */
+ KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
+ vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
+ vertexValue.setState(State.NON_VERTEX);
+ vertex.setVertexValue(vertexValue);
+ }
+
+ return vertex;
+ }
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
new file mode 100644
index 0000000..12c638e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -0,0 +1,48 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+
+public class LogAlgorithmForPathMergeOutputFormat extends
+ BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+
+
+ @Override
+ public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
+
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ if(vertex.getVertexValue().getState() != State.FINAL_DELETE
+ && vertex.getVertexValue().getState() != State.END_VERTEX
+ && vertex.getVertexValue().getState() != State.TODELETE
+ && vertex.getVertexValue().getState() != State.KILL_SELF)
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
new file mode 100644
index 0000000..ca134c0
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -0,0 +1,78 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
+
+public class NaiveAlgorithmForPathMergeInputFormat extends
+ BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
+ /**
+ * Format INPUT
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class BinaryLoadGraphReader extends
+ BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ private Vertex vertex;
+ private KmerBytesWritable vertexId = null;
+ private ValueStateWritable vertexValue = new ValueStateWritable();
+
+ public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
+ super(recordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ if(getRecordReader() != null){
+ /**
+ * set the src vertex id
+ */
+ if(vertexId == null)
+ vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
+ vertexId.set(getRecordReader().getCurrentKey());
+ vertex.setVertexId(vertexId);
+ /**
+ * set the vertex value
+ */
+ KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
+ vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
+ vertex.setVertexValue(vertexValue);
+ }
+
+ return vertex;
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
new file mode 100644
index 0000000..983112b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class NaiveAlgorithmForPathMergeOutputFormat extends
+ BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
new file mode 100644
index 0000000..55a626d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
@@ -0,0 +1,125 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+public class NaiveAlgorithmMessageWritable implements WritableComparable<NaiveAlgorithmMessageWritable>{
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * chainVertexId stores the chains of connected DNA
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private KmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable chainVertexId;
+ private KmerBytesWritable headVertexId;
+ private byte adjMap;
+ private boolean isRear;
+
+ public NaiveAlgorithmMessageWritable(){
+ sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ headVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ }
+
+ public void set(KmerBytesWritable sourceVertex, VKmerBytesWritable chainVertex, KmerBytesWritable headVertex , byte adjMap, boolean isRear){
+ this.sourceVertexId.set(sourceVertex);
+ this.chainVertexId.set(chainVertex);
+ this.headVertexId.set(headVertex);
+ this.adjMap = adjMap;
+ this.isRear = isRear;
+ }
+
+ public KmerBytesWritable getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(KmerBytesWritable source) {
+ this.sourceVertexId.set(source);
+ }
+
+ public byte getAdjMap() {
+ return adjMap;
+ }
+
+ public void setAdjMap(byte adjMap) {
+ this.adjMap = adjMap;
+ }
+
+ public void setChainVertexId(VKmerBytesWritable chainVertex) {
+ this.chainVertexId.set(chainVertex);
+ }
+
+ public VKmerBytesWritable getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public boolean isRear() {
+ return isRear;
+ }
+
+ public void setRear(boolean isRear) {
+ this.isRear = isRear;
+ }
+
+ public int getLengthOfChain() {
+ return this.chainVertexId.getKmerLength();
+ }
+
+
+ public KmerBytesWritable getHeadVertexId() {
+ return headVertexId;
+ }
+
+ public void setHeadVertexId(KmerBytesWritable headVertexId) {
+ this.headVertexId.set(headVertexId);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ sourceVertexId.write(out);
+ headVertexId.write(out);
+ chainVertexId.write(out);
+ out.write(adjMap);
+ out.writeBoolean(isRear);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ sourceVertexId.readFields(in);
+ headVertexId.readFields(in);
+ chainVertexId.readFields(in);
+ adjMap = in.readByte();
+ isRear = in.readBoolean();
+ }
+
+ @Override
+ public int hashCode() {
+ return chainVertexId.hashCode();
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof NaiveAlgorithmMessageWritable) {
+ NaiveAlgorithmMessageWritable tp = (NaiveAlgorithmMessageWritable) o;
+ return chainVertexId.equals( tp.chainVertexId);
+ }
+ return false;
+ }
+ @Override
+ public String toString() {
+ return chainVertexId.toString();
+ }
+
+ @Override
+ public int compareTo(NaiveAlgorithmMessageWritable tp) {
+ return chainVertexId.compareTo(tp.chainVertexId);
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
new file mode 100644
index 0000000..6fef3a6
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
@@ -0,0 +1,70 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class LoadGraphVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
+
+ /**
+ * For test, just output original file
+ */
+ @Override
+ public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ voteToHalt();
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
+ job.setVertexClass(LoadGraphVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
new file mode 100644
index 0000000..60f40dc
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -0,0 +1,366 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ValueStateWritable
+ * edgeValue: NullWritable
+ * message: LogAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class LogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+
+ public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private ValueStateWritable vertexVal = new ValueStateWritable();
+
+ private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable vertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
+ vertexId.set(getVertexId());
+ vertexVal = getVertexValue();
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getNextDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getPreDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(VKmerBytesWritable chainVertexId, byte adjMap){
+ return getDestVertexIdFromChain(chainVertexId, adjMap);//GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+ lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ destVertexId.set(getNextDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(VKmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
+ destVertexId.set(getPreDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+
+ /**
+ * set vertex state
+ */
+ public void setState(){
+ if(msg.getMessage() == Message.START &&
+ (vertexVal.getState() == State.MID_VERTEX || vertexVal.getState() == State.END_VERTEX)){
+ vertexVal.setState(State.START_VERTEX);
+ setVertexValue(vertexVal);
+ }
+ else if(msg.getMessage() == Message.END && vertexVal.getState() == State.MID_VERTEX){
+ vertexVal.setState(State.END_VERTEX);
+ setVertexValue(vertexVal);
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ /**
+ * send start message to next node
+ */
+ public void sendStartMsgToNextNode(){
+ msg.setMessage(Message.START);
+ msg.setSourceVertexId(vertexId);
+ sendMsg(destVertexId, msg);
+ voteToHalt();
+ }
+ /**
+ * send end message to next node
+ */
+ public void sendEndMsgToNextNode(){
+ msg.setMessage(Message.END);
+ msg.setSourceVertexId(vertexId);
+ sendMsg(destVertexId, msg);
+ voteToHalt();
+ }
+ /**
+ * send non message to next node
+ */
+ public void sendNonMsgToNextNode(){
+ msg.setMessage(Message.NON);
+ msg.setSourceVertexId(vertexId);
+ sendMsg(destVertexId, msg);
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(VKmerBytesWritable chainVertexId, byte adjMap){
+ if(GeneCode.getGeneCodeFromBitMap((byte)(vertexVal.getAdjMap() & 0x0F)) == -1) //|| lastKmer == null
+ voteToHalt();
+ else{
+ destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
+ if(vertexVal.getState() == State.START_VERTEX){
+ sendStartMsgToNextNode();
+ }
+ else if(vertexVal.getState() != State.END_VERTEX && vertexVal.getState() != State.FINAL_DELETE){
+ sendEndMsgToNextNode();
+ }
+ }
+ }
+ /**
+ * path send message to head
+ */
+ public void responseMsgToHeadVertex(){
+ if(vertexVal.getLengthOfMergeChain() == -1){
+ vertexVal.setMergeChain(vertexId);
+ setVertexValue(vertexVal);
+ }
+ msg.set(msg.getSourceVertexId(), vertexVal.getMergeChain(), vertexVal.getAdjMap(), msg.getMessage(), vertexVal.getState());
+ setMessageType(msg.getMessage());
+ destVertexId.set(msg.getSourceVertexId());
+ sendMsg(destVertexId,msg);
+ }
+ /**
+ * set message type
+ */
+ public void setMessageType(int message){
+ //kill Message because it has been merged by the head
+ if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ msg.setMessage(Message.END);
+ vertexVal.setState(State.FINAL_DELETE);
+ setVertexValue(vertexVal);
+ //deleteVertex(getVertexId());
+ }
+ else
+ msg.setMessage(Message.NON);
+
+ if(message == Message.START){
+ vertexVal.setState(State.TODELETE);
+ setVertexValue(vertexVal);
+ }
+ }
+ /**
+ * set vertexValue's state chainVertexId, value
+ */
+ public void setVertexValueAttributes(){
+ if(msg.getMessage() == Message.END){
+ if(vertexVal.getState() != State.START_VERTEX)
+ vertexVal.setState(State.END_VERTEX);
+ else
+ vertexVal.setState(State.FINAL_VERTEX);
+ }
+
+ if(getSuperstep() == 5)
+ chainVertexId.set(vertexId);
+ else
+ chainVertexId.set(vertexVal.getMergeChain());
+ lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
+ chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
+ vertexVal.setMergeChain(chainVertexId);
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
+ vertexVal.setAdjMap(tmpVertexValue);
+ }
+ /**
+ * send message to self
+ */
+ public void sendMsgToSelf(){
+ if(msg.getMessage() != Message.END){
+ setVertexValue(vertexVal);
+ msg.reset(); //reset
+ msg.setAdjMap(vertexVal.getAdjMap());
+ sendMsg(vertexId,msg);
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
+ sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
+ sendMsgToAllPreviousNodes(vertexId, vertexVal.getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ vertexVal.setState(State.MID_VERTEX);
+ setVertexValue(vertexVal);
+ }
+ }
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ msg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(getSuperstep() == 3){
+ msg.reset();
+ sendMsgToPathVertex(vertexId, vertexVal.getAdjMap());
+ }
+ else{
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ sendMsgToPathVertex(vertexVal.getMergeChain(), msg.getAdjMap());
+ }
+ }
+ }
+ /**
+ * path response message to head
+ */
+ public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ responseMsgToHeadVertex();
+ //voteToHalt();
+ }
+ else{
+ if(getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ //vertexVal.setState(State.KILL_SELF);
+ //setVertexValue(vertexVal);
+ //voteToHalt();
+ deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ }
+ }
+ }
+ /**
+ * merge chainVertex and store in vertexVal.chainVertexId
+ */
+ public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ setVertexValueAttributes();
+ sendMsgToSelf();
+ }
+ if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ voteToHalt();
+ }
+ if(vertexVal.getState() == State.FINAL_VERTEX){
+ //String source = vertexVal.getMergeChain().toString();
+ voteToHalt();
+ }
+ }
+ @Override
+ public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if(getSuperstep() == 2)
+ initState(msgIterator);
+ else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
+ sendMsgToPathVertex(msgIterator);
+ }
+ else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
+ responseMsgToHeadVertex(msgIterator);
+ }
+ else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
+ if(vertexVal.getState() == State.TODELETE){ //|| vertexVal.getState() == State.KILL_SELF)
+ deleteVertex(getVertexId()); //killSelf voteToHalt()
+ }
+ else{
+ mergeChainVertex(msgIterator);
+ }
+ }
+ }
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
new file mode 100644
index 0000000..9d6c9ff
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -0,0 +1,207 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class NaiveAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
+
+ public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private ValueStateWritable vertexVal = new ValueStateWritable();
+
+ private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable vertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
+ vertexId.set(getVertexId());
+ vertexVal = getVertexValue();
+ }
+ public void findDestination(){
+ destVertexId.set(msg.getSourceVertexId());
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+ lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ destVertexId.set(getDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+ /**
+ * initiate chain vertex
+ */
+ public void initChainVertex(){
+ if(!msg.isRear()){
+ findDestination();
+ if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ chainVertexId.set(vertexId);
+ msg.set(vertexId, chainVertexId, vertexId, vertexVal.getAdjMap(), false);
+ sendMsg(destVertexId,msg);
+ }else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap()))
+ voteToHalt();
+ }
+ }
+ /**
+ * head node sends message to path node
+ */
+ public void sendMsgToPathVertex(){
+ if(!msg.isRear()){
+ destVertexId.set(getDestVertexIdFromChain(msg.getChainVertexId(), msg.getAdjMap()));
+ }else{
+ destVertexId.set(msg.getHeadVertexId());
+ }
+ msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+ sendMsg(destVertexId,msg);
+ }
+ /**
+ * path node sends message back to head node
+ */
+ public void responseMsgToHeadVertex(){
+ if(!msg.isRear()){
+ findDestination();
+ if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
+ vertexId.getGeneCodeAtPosition(kmerSize - 1));
+ deleteVertex(getVertexId());
+ msg.set(vertexId, chainVertexId, msg.getHeadVertexId(), vertexVal.getAdjMap(), false);
+ sendMsg(destVertexId,msg);
+ }
+ else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
+ sendMsg(destVertexId,msg);
+ }
+ }else{// is Rear
+ chainVertexId.set(msg.getSourceVertexId());
+ vertexVal.set(GraphVertexOperation.updateRightNeighberByVertexId(vertexVal.getAdjMap(), chainVertexId, kmerSize),
+ State.START_VERTEX, msg.getChainVertexId());
+ setVertexValue(vertexVal);
+ //String source = msg.getChainVertexId().toString();
+ //System.out.print("");
+ }
+ }
+
+ @Override
+ public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1) {
+ if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
+ msg.set(vertexId, chainVertexId, vertexId, (byte)0, false);
+ sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+ }
+ }
+ else if(getSuperstep() == 2){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ initChainVertex();
+ }
+ }
+ //head node sends message to path node
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ while (msgIterator.hasNext()){
+ msg = msgIterator.next();
+ sendMsgToPathVertex();
+ }
+ }
+ //path node sends message back to head node
+ else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
+ while(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ responseMsgToHeadVertex();
+ }
+ }
+ voteToHalt();
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
new file mode 100644
index 0000000..555cf66
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.genomix.pregelix.sequencefile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+
+public class GenerateSmallFile {
+
+ public static void generateNumOfLinesFromBigFile(Path inFile, Path outFile, int numOfLines) throws IOException{
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, KmerBytesWritable.class, KmerCountValue.class,
+ CompressionType.NONE);
+ KmerBytesWritable outKey = new KmerBytesWritable(55);
+ KmerCountValue outValue = new KmerCountValue();
+ int i = 0;
+
+ for(i = 0; i < numOfLines; i++){
+ //System.out.println(i);
+ reader.next(outKey, outValue);
+ writer.append(outKey, outValue);
+ }
+ writer.close();
+ reader.close();
+ }
+ /**
+ * @param args
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ // TODO Auto-generated method stub
+ Path dir = new Path("data");
+ Path inFile = new Path(dir, "part-0");
+ Path outFile = new Path(dir, "part-0-out-5000000");
+ generateNumOfLinesFromBigFile(inFile,outFile,5000000);
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
new file mode 100644
index 0000000..5cb0c2b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/GraphVertexOperation.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.genomix.pregelix.util;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+public class GraphVertexOperation {
+
+ /**
+ * generate the valid data(byte[]) from BytesWritable
+ */
+ public static byte[] generateValidDataFromBytesWritable(VKmerBytesWritable bw){
+ byte[] wholeBytes = bw.getBytes();
+ int validNum = bw.getLength();
+ byte[] validBytes = new byte[validNum];
+ for(int i = 0; i < validNum; i++)
+ validBytes[i] = wholeBytes[i];
+ return validBytes;
+ }
+ /**
+ * Single Vertex: in-degree = out-degree = 1
+ * @param vertexValue
+ */
+ public static boolean isPathVertex(byte value){
+ if(GeneCode.inDegree(value) == 1 && GeneCode.outDegree(value) == 1)
+ return true;
+ return false;
+ }
+ /**
+ * Head Vertex: out-degree > 0,
+ * @param vertexValue
+ */
+ public static boolean isHeadVertex(byte value){
+ if(GeneCode.outDegree(value) > 0 && !isPathVertex(value))
+ return true;
+ return false;
+ }
+ /**
+ * Rear Vertex: in-degree > 0,
+ * @param vertexValue
+ */
+ public static boolean isRearVertex(byte value){
+ if(GeneCode.inDegree(value) > 0 && !isPathVertex(value))
+ return true;
+ return false;
+ }
+ /**
+ * update right neighber based on next vertexId
+ */
+ public static byte updateRightNeighberByVertexId(byte oldVertexValue, VKmerBytesWritable neighberVertex, int k){
+ byte geneCode = neighberVertex.getGeneCodeAtPosition(k-1);
+
+ byte newBit = GeneCode.getAdjBit(geneCode);
+ return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newBit & 0x0F));
+ }
+ /**
+ * update right neighber
+ */
+ public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue){
+ return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
+ }
+
+}