Merge branch 'genomix/fullstack_genomix' of https://code.google.com/p/hyracks into jianfeng/genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 1e0c23e..50baeb4 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -50,6 +50,10 @@
public KmerBytesWritable(int k, byte[] storage, int offset) {
setNewReference(k, storage, offset);
}
+
+ public KmerBytesWritable(int k, String kmer) {
+ setNewReference(kmer.length(), kmer.getBytes(), 0);
+ }
/**
* Initial Kmer space by kmerlength
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index ca47c10..d4dab00 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -39,7 +39,7 @@
}
public NodeWritable(int kmerSize) {
- nodeID = new PositionWritable(0,(byte) 0);
+ nodeID = new PositionWritable(0, (byte) 0);
forwardForwardList = new PositionListWritable();
forwardReverseList = new PositionListWritable();
reverseForwardList = new PositionListWritable();
@@ -47,6 +47,17 @@
kmer = new KmerBytesWritable(kmerSize);
}
+ public NodeWritable(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
+ PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
+ this(kmer.getKmerLength());
+ this.nodeID.set(nodeID);
+ forwardForwardList.set(FFList);
+ forwardReverseList.set(FRList);
+ reverseForwardList.set(RFList);
+ reverseReverseList.set(RRList);
+ kmer.set(kmer);
+ }
+
public void setNodeID(PositionWritable ref) {
this.setNodeID(ref.getReadID(), ref.getPosInRead());
}
@@ -58,7 +69,7 @@
public void setKmer(KmerBytesWritable right) {
this.kmer.set(right);
}
-
+
public void reset(int kmerSize) {
nodeID.set(0, (byte) 0);
forwardForwardList.reset();
@@ -71,7 +82,7 @@
public PositionListWritable getFFList() {
return forwardForwardList;
}
-
+
public PositionListWritable getFRList() {
return forwardReverseList;
}
@@ -79,7 +90,7 @@
public PositionListWritable getRFList() {
return reverseForwardList;
}
-
+
public PositionListWritable getRRList() {
return reverseReverseList;
}
@@ -101,8 +112,8 @@
this.forwardReverseList.set(nextNode.forwardReverseList);
kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
}
-
- public void mergeForwardPre(NodeWritable preNode, int initialKmerSize){
+
+ public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
this.reverseForwardList.set(preNode.reverseForwardList);
this.reverseReverseList.set(preNode.reverseReverseList);
kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
@@ -148,6 +159,18 @@
}
@Override
+ public boolean equals(Object o) {
+ if (o instanceof NodeWritable) {
+ NodeWritable nw = (NodeWritable) o;
+ return (this.nodeID.equals(nw.nodeID) && this.forwardForwardList.equals(nw.forwardForwardList)
+ && this.forwardReverseList.equals(nw.forwardReverseList)
+ && this.reverseForwardList.equals(nw.reverseForwardList)
+ && this.reverseReverseList.equals(nw.reverseReverseList) && this.kmer.equals(nw.kmer));
+ }
+ return false;
+ }
+
+ @Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
sbuilder.append('(');
@@ -159,15 +182,15 @@
sbuilder.append(kmer.toString()).append(')');
return sbuilder.toString();
}
-
- public int inDegree(){
+
+ public int inDegree() {
return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
}
-
- public int outDegree(){
+
+ public int outDegree() {
return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
}
-
+
/*
* Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1
*/
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 1b1b392..eca4a28 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -19,7 +19,9 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.io.Writable;
@@ -47,6 +49,13 @@
public PositionListWritable(int count, byte[] data, int offset) {
setNewReference(count, data, offset);
}
+
+ public PositionListWritable(List<PositionWritable> posns) {
+ this();
+ for (PositionWritable p : posns) {
+ append(p);
+ }
+ }
public void setNewReference(int count, byte[] data, int offset) {
this.valueCount = count;
@@ -198,4 +207,23 @@
}
return sbuilder.toString();
}
+
+ @Override
+ public int hashCode() {
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PositionListWritable))
+ return false;
+ PositionListWritable other = (PositionListWritable) o;
+ if (this.valueCount != other.valueCount)
+ return false;
+ for (int i=0; i < this.valueCount; i++) {
+ if (!this.getPosition(i).equals(other.getPosition(i)))
+ return false;
+ }
+ return true;
+ }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 8e08ca6..8895f5c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -147,7 +147,6 @@
return diff2;
}
return diff1;
- // return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
diff --git a/genomix/genomix-hadoop/pom.xml b/genomix/genomix-hadoop/pom.xml
index 610092a..8ca2fa3 100755
--- a/genomix/genomix-hadoop/pom.xml
+++ b/genomix/genomix-hadoop/pom.xml
@@ -174,5 +174,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.mrunit</groupId>
+ <artifactId>mrunit</artifactId>
+ <version>1.0.0</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
</dependencies>
</project>
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index 38dde9d..c2b0e52 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -40,6 +40,7 @@
public static final byte IS_HEAD = 1 << 3;
public static final byte IS_TAIL = 1 << 4;
public static final byte IS_PSEUDOHEAD = 1 << 5;
+ public static final byte IS_COMPLETE = 1 << 6;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 063c5f7..b240833 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -35,7 +36,7 @@
* Mapper class: Partition the graph using random pseudoheads.
* Heads send themselves to their successors, and all others map themselves.
*/
- private static class MergePathsH4Mapper extends MapReduceBase implements
+ public static class MergePathsH4Mapper extends MapReduceBase implements
Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
private static long randSeed;
private Random randGenerator;
@@ -59,6 +60,7 @@
private byte outFlag;
public void configure(JobConf conf) {
+
randSeed = conf.getLong("randomSeed", 0);
randGenerator = new Random(randSeed);
probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
@@ -116,24 +118,35 @@
public void map(PositionWritable key, MessageWritableNodeWithFlag value,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
throws IOException {
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+ tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
+ outFlag = (byte) (headFlag | tailFlag);
+
// only PATH vertices are present. Find the ID's for my neighbors
curNode.set(value.getNode());
curID.set(curNode.getNodeID());
+
curHead = isNodeRandomHead(curID);
- hasNext = setNextInfo(curNode);
- hasPrev = setPrevInfo(curNode);
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(curNode) && tailFlag == 0;
+ hasPrev = setPrevInfo(curNode) && headFlag == 0;
willMerge = false;
reporter.setStatus("CHECK ME OUT");
- System.out.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
+ System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
// TODO: need to update edges in neighboring nodes
-
+
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ // true HEAD met true TAIL. this path is complete
+ outFlag |= MessageFlag.FROM_SELF;
+ outputValue.set(outFlag, curNode);
+ output.collect(curID, outputValue);
+ return;
+ }
if (hasNext || hasPrev) {
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
- tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
- outFlag = (byte) (headFlag | tailFlag);
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
@@ -181,12 +194,16 @@
}
}
- // if we didn't send ourselves to some other node, remap ourselves
+ // if we didn't send ourselves to some other node, remap ourselves for the next round
if (!willMerge) {
outFlag |= MessageFlag.FROM_SELF;
outputValue.set(outFlag, curNode);
output.collect(curID, outputValue);
}
+ else {
+ // TODO send update to this node's neighbors
+ //mos.getCollector(UPDATES_OUTPUT, reporter).collect(key, outputValue);
+ }
}
}
@@ -195,7 +212,10 @@
*/
private static class MergePathsH4Reducer extends MapReduceBase implements
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
+ private MultipleOutputs mos;
+ public static final String COMPLETE_OUTPUT = "complete";
+ public static final String UPDATES_OUTPUT = "update";
+
private int KMER_SIZE;
private MessageWritableNodeWithFlag inputValue;
private MessageWritableNodeWithFlag outputValue;
@@ -209,13 +229,16 @@
private byte outFlag;
public void configure(JobConf conf) {
+ mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
+ inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
prevNode = new NodeWritable(KMER_SIZE);
nextNode = new NodeWritable(KMER_SIZE);
}
+ @SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
@@ -223,10 +246,14 @@
inputValue.set(values.next());
if (!values.hasNext()) {
- // all single nodes must be remapped
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
- // FROM_SELF => remap self
- output.collect(key, inputValue);
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+ // complete path (H & T meet in this node)
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ } else {
+ // FROM_SELF => no merging this round. remap self
+ output.collect(key, inputValue);
+ }
} else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
// FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
throw new IOException("Only one value recieved in merge, but it wasn't from self!");
@@ -246,10 +273,12 @@
sawPrevNode = true;
} else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
nextNode.set(inputValue.getNode());
- sawNextNode = false;
- } else {
+ sawNextNode = true;
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
curNode.set(inputValue.getNode());
sawCurNode = true;
+ } else {
+ throw new IOException("Unknown origin for merging node");
}
if (!values.hasNext()) {
break;
@@ -279,7 +308,7 @@
outputValue.set(outFlag, curNode);
if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// True heads meeting tails => merge is complete for this node
- // TODO: send to the "complete" collector
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
} else {
output.collect(key, outputValue);
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
new file mode 100644
index 0000000..6518532
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -0,0 +1,152 @@
+package edu.uci.ics.genomix.hadoop.graphclean.removetips;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class RemoveTips extends Configured implements Tool {
+
+ /*
+ * Mapper class: removes any tips by not mapping them at all
+ */
+ private static class RemoveTipsMapper extends MapReduceBase implements
+ Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ private int KMER_SIZE;
+ private int removeTipsMinLength;
+
+ private MessageWritableNodeWithFlag outputValue;
+ private NodeWritable curNode;
+
+ public void configure(JobConf conf) {
+ removeTipsMinLength = conf.getInt("removeTipsMinLength", 0);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ curNode = new NodeWritable(KMER_SIZE);
+ }
+
+ @Override
+ public void map(PositionWritable key, MessageWritableNodeWithFlag value,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+ curNode.set(value.getNode());
+ if ((curNode.inDegree() == 0 || curNode.outDegree() == 0)
+ && curNode.getKmer().getKmerLength() < removeTipsMinLength) {
+ // kill this node by NOT mapping it. Update my neighbors with a suicide note
+ //TODO: update neighbors by removing me from its list
+ } else {
+ outputValue.set(MessageFlag.FROM_SELF, curNode);
+ output.collect(key, value);
+ }
+ }
+ }
+
+ /*
+ * Reducer class: keeps mapped nodes
+ */
+ private static class MergePathsH4Reducer extends MapReduceBase implements
+ Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+
+ private int KMER_SIZE;
+ private MessageWritableNodeWithFlag inputValue;
+ private MessageWritableNodeWithFlag outputValue;
+ private NodeWritable curNode;
+ private NodeWritable prevNode;
+ private NodeWritable nextNode;
+ private boolean sawCurNode;
+ private boolean sawPrevNode;
+ private boolean sawNextNode;
+ private int count;
+ private byte outFlag;
+
+ public void configure(JobConf conf) {
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ curNode = new NodeWritable(KMER_SIZE);
+ prevNode = new NodeWritable(KMER_SIZE);
+ nextNode = new NodeWritable(KMER_SIZE);
+ }
+
+ @Override
+ public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+
+ inputValue.set(values.next());
+ if (!values.hasNext()) {
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ // FROM_SELF => keep self
+ output.collect(key, inputValue);
+ } else {
+ throw new IOException("Only one value recieved in merge, but it wasn't from self!");
+ }
+ } else {
+ throw new IOException("Expected only one node during reduce... saw more");
+ }
+ }
+ }
+
+ /*
+ * Run one iteration of the mergePaths algorithm
+ */
+ public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ JobConf conf = new JobConf(baseConf);
+ conf.setJarByClass(MergePathsH4.class);
+ conf.setJobName("MergePathsH4 " + inputPath);
+
+ FileInputFormat.addInputPath(conf, new Path(inputPath));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ conf.setMapOutputKeyClass(PositionWritable.class);
+ conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputKeyClass(PositionWritable.class);
+ conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+
+ conf.setMapperClass(MergePathsH4Mapper.class);
+ conf.setReducerClass(MergePathsH4Reducer.class);
+
+ FileSystem.get(conf).delete(new Path(outputPath), true);
+
+ return JobClient.runJob(conf);
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
+ System.out.println("Ran the job fine!");
+ System.exit(res);
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
index 91415ed..f05797e 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
@@ -7,6 +7,7 @@
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
/*
@@ -20,7 +21,7 @@
public MessageWritableNodeWithFlag() {
this(0);
}
-
+
public MessageWritableNodeWithFlag(int k) {
this.flag = 0;
this.node = new NodeWritable(k);
@@ -30,6 +31,11 @@
this.flag = flag;
this.node = new NodeWritable(kmerSize);
}
+
+ public MessageWritableNodeWithFlag(byte flag, NodeWritable node) {
+ this(node.getKmer().getKmerLength());
+ set(flag, node);
+ }
public void set(MessageWritableNodeWithFlag right) {
set(right.getFlag(), right.getNode());
@@ -79,4 +85,19 @@
public int getLength() {
return node.getCount();
}
+
+ @Override
+ public int hashCode() {
+// return super.hashCode() + flag + node.hashCode();
+ return flag + node.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object rightObj) {
+ if (rightObj instanceof MessageWritableNodeWithFlag) {
+ MessageWritableNodeWithFlag rightMessage = (MessageWritableNodeWithFlag) rightObj;
+ return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
+ }
+ return false;
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index e7bcdf6..497e926 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -57,10 +58,7 @@
private int inDegree;
private int outDegree;
private NodeWritable emptyNode;
-
- public PathNodeInitialMapper() {
-
- }
+ private Iterator<PositionWritable> posIterator;
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
@@ -80,47 +78,79 @@
outputValue.set(MessageFlag.FROM_SELF, key);
output.collect(key.getNodeID(), outputValue);
reporter.incrCounter("genomix", "path_nodes", 1);
- } else if (outDegree == 1) {
- // Not a path myself, but my successor might be one. Map forward successor
+ } else if (inDegree == 0 && outDegree == 1) {
+ // start of a tip. needs to merge & be marked as head
+ outputValue.set(MessageFlag.FROM_SELF, key);
+ output.collect(key.getNodeID(), outputValue);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+
outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
- if (key.getFFList().getCountOfPosition() > 0) {
- outputKey.set(key.getFFList().getPosition(0));
- } else {
- outputKey.set(key.getFRList().getPosition(0));
- }
- output.collect(outputKey, outputValue);
- } else if (inDegree == 1) {
- // Not a path myself, but my predecessor might be one.
+ output.collect(key.getNodeID(), outputValue);
+ } else if (inDegree == 1 && outDegree == 0) {
+ // end of a tip. needs to merge & be marked as tail
+ outputValue.set(MessageFlag.FROM_SELF, key);
+ output.collect(key.getNodeID(), outputValue);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+
outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
- if (key.getRRList().getCountOfPosition() > 0) {
- outputKey.set(key.getRRList().getPosition(0));
- } else {
- outputKey.set(key.getRFList().getPosition(0));
- }
- output.collect(outputKey, outputValue);
+ output.collect(key.getNodeID(), outputValue);
} else {
- // TODO: all other nodes will not participate-- should they be collected in a "complete" output?
+ if (outDegree > 0) {
+ // Not a path myself, but my successor might be one. Map forward successor to find heads
+ outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
+ posIterator = key.getFFList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ posIterator = key.getFRList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ }
+ if (inDegree > 0) {
+ // Not a path myself, but my predecessor might be one. map predecessor to find tails
+ outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
+ posIterator = key.getRRList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ posIterator = key.getRFList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ }
+ // push this non-path node to the "complete" output
+ outputValue.set((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), key);
+ output.collect(key.getNodeID(), outputValue);
}
}
}
public static class PathNodeInitialReducer extends MapReduceBase implements
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
+ private MultipleOutputs mos;
+ private static final String COMPLETE_OUTPUT = "complete";
private int KMER_SIZE;
private MessageWritableNodeWithFlag inputValue;
private MessageWritableNodeWithFlag outputValue;
private NodeWritable nodeToKeep;
private int count;
private byte flag;
+ private boolean isComplete;
public void configure(JobConf conf) {
+ mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
nodeToKeep = new NodeWritable(KMER_SIZE);
}
+ @SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
@@ -128,41 +158,63 @@
inputValue.set(values.next());
if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
- // FROM_SELF => need to keep this PATH node
- output.collect(key, inputValue);
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
+ // non-path node. Store in "complete" output
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ } else {
+ // FROM_SELF => need to keep this PATH node
+ output.collect(key, inputValue);
+ }
}
} else {
// multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node
count = 0;
flag = MessageFlag.EMPTY_MESSAGE;
+ isComplete = false;
while (true) { // process values; break when no more
count++;
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
// SELF -> keep this node
+ flag |= MessageFlag.FROM_SELF;
nodeToKeep.set(inputValue.getNode());
- } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) == MessageFlag.FROM_SUCCESSOR) {
+ if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
+ isComplete = true;
+ }
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
flag |= MessageFlag.IS_TAIL;
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
- } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
flag |= MessageFlag.IS_HEAD;
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
}
if (!values.hasNext()) {
break;
} else {
- inputValue = values.next();
+ inputValue.set(values.next());
}
}
if (count < 2) {
throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
+ String.valueOf(count));
}
- if ((flag & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
- // only map simple path nodes
- outputValue.set(flag, nodeToKeep);
- output.collect(key, outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((flag & MessageFlag.FROM_SELF) > 0) {
+ if ((flag & MessageFlag.IS_COMPLETE) > 0) {
+ // non-path node. Store in "complete" output
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ } else {
+ // only keep simple path nodes
+ outputValue.set(flag, nodeToKeep);
+ output.collect(key, outputValue);
+
+ reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((flag & MessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((flag & MessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ }
+ }
+ } else {
+ throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + flag);
}
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
index eaf2a6f..c735a0d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
@@ -52,6 +52,7 @@
public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength,
boolean onlyTest1stJob, boolean seqOutput, String defaultConfPath) throws IOException {
if (onlyTest1stJob == true) {
+
runfirstjob(inputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
} else {
runfirstjob(inputPath, numReducers, sizeKmer, readLength, true, defaultConfPath);
@@ -113,6 +114,7 @@
conf.setPartitionerClass(ReadIDPartitioner.class);
+ // grouping is done on the readID only; sorting is based on the (readID, abs(posn))
conf.setOutputKeyComparatorClass(PositionWritable.Comparator.class);
conf.setOutputValueGroupingComparator(PositionWritable.FirstComparator.class);
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
new file mode 100644
index 0000000..b142f87
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mrunit.MapDriver;
+import org.apache.hadoop.mrunit.ReduceDriver;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class TestPathNodeInitial {
+ PositionWritable posn1 = new PositionWritable(0, (byte) 1);
+ PositionWritable posn2 = new PositionWritable(1, (byte) 1);
+ PositionWritable posn3 = new PositionWritable(2, (byte) 1);
+ PositionWritable posn4 = new PositionWritable(3, (byte) 1);
+ PositionWritable posn5 = new PositionWritable(5, (byte) 1);
+ String kmerString = "ATGCA";
+ KmerBytesWritable kmer = new KmerBytesWritable(kmerString.length(), kmerString);
+ JobConf conf = new JobConf();
+ MultipleOutputs mos = new MultipleOutputs(conf);
+
+ {
+ conf.set("sizeKmer", String.valueOf(kmerString.length()));
+ }
+
+ @Test
+ public void testNoNeighbors() throws IOException {
+ NodeWritable noNeighborNode = new NodeWritable(posn1, new PositionListWritable(), new PositionListWritable(),
+ new PositionListWritable(), new PositionListWritable(), kmer);
+ MessageWritableNodeWithFlag output = new MessageWritableNodeWithFlag((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), noNeighborNode);
+ // test mapper
+ new MapDriver<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag>()
+ .withMapper(new PathNodeInitial.PathNodeInitialMapper())
+ .withConfiguration(conf)
+ .withInput(noNeighborNode, NullWritable.get())
+ .withOutput(posn1, output)
+ .runTest();
+ // test reducer
+// MultipleOutputs.addNamedOutput(conf, "complete", SequenceFileOutputFormat.class, PositionWritable.class, MessageWritableNodeWithFlag.class);
+ new ReduceDriver<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag>()
+ .withReducer(new PathNodeInitial.PathNodeInitialReducer())
+ .withConfiguration(conf)
+ .withInput(posn1, Arrays.asList(output))
+ .runTest();
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
new file mode 100644
index 0000000..d3d52f3
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.genomix.pregelix.api.io.binary;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class BinaryDataCleanVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+
+ /** Uses the SequenceFileInputFormat to do everything */
+ @SuppressWarnings("rawtypes")
+ protected SequenceFileInputFormat binaryInputFormat = new SequenceFileInputFormat();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex input. Easiest to ignore the key value separator and only use key
+ * instead.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ public static abstract class BinaryDataCleanVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /** Internal line record reader */
+ private final RecordReader<PositionWritable, ValueStateWritable> lineRecordReader;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the LineRecordReader.
+ *
+ * @param recordReader
+ * Line record reader from SequenceFileInputFormat
+ */
+ public BinaryDataCleanVertexReader(RecordReader<PositionWritable, ValueStateWritable> recordReader) {
+ this.lineRecordReader = recordReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineRecordReader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ @Override
+ public void close() throws IOException {
+ lineRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return lineRecordReader.getProgress();
+ }
+
+ /**
+ * Get the line record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<PositionWritable, ValueStateWritable> getRecordReader() {
+ return lineRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ // Ignore the hint of numWorkers here since we are using SequenceFileInputFormat
+ // to do this for us
+ return binaryInputFormat.getSplits(context);
+ }
+
+ @Override
+ public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ return null;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 94b0c51..e135085 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -9,9 +9,9 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
new file mode 100644
index 0000000..140a703
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat.BinaryDataCleanVertexReader;
+
+public class DataCleanInputFormat extends
+ BinaryDataCleanVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ /**
+ * Format INPUT
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new BinaryDataCleanLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class BinaryDataCleanLoadGraphReader extends
+ BinaryDataCleanVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ private Vertex vertex;
+ private PositionWritable vertexId = new PositionWritable();
+ private ValueStateWritable vertexValue = new ValueStateWritable();
+
+ public BinaryDataCleanLoadGraphReader(RecordReader<PositionWritable, ValueStateWritable> recordReader) {
+ super(recordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ if (getRecordReader() != null) {
+ /**
+ * set the src vertex id
+ */
+ vertexId.set(getRecordReader().getCurrentKey());
+ vertex.setVertexId(vertexId);
+ /**
+ * set the vertex value
+ */
+ vertexValue.set(getRecordReader().getCurrentValue());
+ vertex.setVertexValue(vertexValue);
+ }
+
+ return vertex;
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
new file mode 100644
index 0000000..40abd3e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class DataCleanOutputFormat extends
+ BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<PositionWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index fae1970..a0a8ea5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -27,6 +27,10 @@
reverseList.reset();
}
+ public int getCountOfPosition(){
+ return forwardList.getCountOfPosition() + reverseList.getCountOfPosition();
+ }
+
public PositionListWritable getForwardList() {
return forwardList;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
new file mode 100644
index 0000000..55ddb1b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -0,0 +1,185 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class MergeBubbleMessageWritable implements WritableComparable<MergeBubbleMessageWritable> {
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private PositionWritable sourceVertexId;
+ private KmerBytesWritable chainVertexId;
+ private AdjacencyListWritable neighberNode; //incoming or outgoing
+ private byte message;
+ private PositionWritable startVertexId;
+
+ private byte checkMessage;
+
+ public MergeBubbleMessageWritable() {
+ sourceVertexId = new PositionWritable();
+ chainVertexId = new KmerBytesWritable(0);
+ neighberNode = new AdjacencyListWritable();
+ message = Message.NON;
+ startVertexId = new PositionWritable();
+ checkMessage = (byte) 0;
+ }
+
+ public void set(MessageWritable msg) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(msg.getSourceVertexId());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(msg.getChainVertexId());
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(msg.getNeighberNode());
+ }
+ this.message = msg.getMessage();
+ }
+
+ public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ this.message = message;
+ }
+
+ public void reset() {
+ checkMessage = 0;
+ chainVertexId.reset(1);
+ neighberNode.reset();
+ message = Message.NON;
+ }
+
+ public PositionWritable getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(PositionWritable sourceVertexId) {
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ }
+
+ public KmerBytesWritable getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public void setChainVertexId(KmerBytesWritable chainVertexId) {
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ }
+
+ public AdjacencyListWritable getNeighberNode() {
+ return neighberNode;
+ }
+
+ public void setNeighberNode(AdjacencyListWritable neighberNode) {
+ if(neighberNode != null){
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ }
+
+ public int getLengthOfChain() {
+ return chainVertexId.getKmerLength();
+ }
+
+ public PositionWritable getStartVertexId() {
+ return startVertexId;
+ }
+
+ public void setStartVertexId(PositionWritable startVertexId) {
+ if(startVertexId != null){
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(startVertexId);
+ }
+ }
+
+ public byte getMessage() {
+ return message;
+ }
+
+ public void setMessage(byte message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(checkMessage);
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.write(out);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.write(out);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.write(out);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.write(out);
+ out.write(message);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.reset();
+ checkMessage = in.readByte();
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.readFields(in);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.readFields(in);
+ message = in.readByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return sourceVertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof MergeBubbleMessageWritable) {
+ MergeBubbleMessageWritable tp = (MergeBubbleMessageWritable) o;
+ return sourceVertexId.equals(tp.sourceVertexId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return sourceVertexId.toString();
+ }
+
+ public int compareTo(MergeBubbleMessageWritable tp) {
+ return sourceVertexId.compareTo(tp.sourceVertexId);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index f0a6b58..6a71344 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -31,6 +31,23 @@
message = Message.NON;
checkMessage = (byte) 0;
}
+
+ public void set(MessageWritable msg) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(msg.getSourceVertexId());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(msg.getChainVertexId());
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(msg.getNeighberNode());
+ }
+ this.message = msg.getMessage();
+ }
public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
checkMessage = 0;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 985c2ac..7895da4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -29,7 +29,7 @@
reverseForwardList, reverseReverseList,
state, mergeChain);
}
-
+
public void set(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
byte state, KmerBytesWritable mergeChain) {
@@ -41,6 +41,11 @@
this.mergeChain.set(mergeChain);
}
+ public void set(ValueStateWritable value) {
+ set(value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
+ value.getMergeChain());
+ }
+
public PositionListWritable getFFList() {
return outgoingList.getForwardList();
}
@@ -78,7 +83,7 @@
}
public void setIncomingList(AdjacencyListWritable incomingList) {
- this.incomingList = incomingList;
+ this.incomingList.set(incomingList);
}
public AdjacencyListWritable getOutgoingList() {
@@ -86,7 +91,7 @@
}
public void setOutgoingList(AdjacencyListWritable outgoingList) {
- this.outgoingList = outgoingList;
+ this.outgoingList.set(outgoingList);
}
public byte getState() {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
new file mode 100644
index 0000000..8716d8d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -0,0 +1,198 @@
+package edu.uci.ics.genomix.pregelix.operator.bridgeremove;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class BridgeRemoveVertex extends
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
+ public static final String LENGTH = "BridgeRemoveVertex.length";
+ public static int kmerSize = -1;
+ private int length = -1;
+
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
+ private ArrayList<MessageWritable> receivedMsg = new ArrayList<MessageWritable>();
+
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if(length == -1)
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
+ outgoingMsg.reset();
+ receivedMsg.clear();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1) {
+ if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+ sendMsgToAllNextNodes(getVertexValue());
+ }
+ else if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+ sendMsgToAllPreviousNodes(getVertexValue());
+ }
+ }
+ else if (getSuperstep() == 2){
+ int i = 0;
+ while (msgIterator.hasNext()) {
+ if(i == 3)
+ break;
+ receivedMsg.add(msgIterator.next());
+ i++;
+ }
+ if(receivedMsg.size() == 2){
+ if(getVertexValue().getLengthOfMergeChain() > length){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ if(receivedMsg.get(0).getMessage() == AdjMessage.FROMFF
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR){
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFF
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ }
+ }
+ }
+ }
+ else if(getSuperstep() == 3){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ //remove incomingMsg.getSourceId from RR positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ //remove incomingMsg.getSourceId from RF positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ //remove incomingMsg.getSourceId from FR positionList
+ } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ //remove incomingMsg.getSourceId from FF positionList
+ }
+ }
+ }
+ voteToHalt();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(BridgeRemoveVertex.class.getSimpleName());
+ job.setVertexClass(BridgeRemoveVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(DataCleanInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
new file mode 100644
index 0000000..e16cd20
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -0,0 +1,207 @@
+package edu.uci.ics.genomix.pregelix.operator.bubblemerge;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class BubbleMergeVertex extends
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MergeBubbleMessageWritable> {
+ public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
+ public static final String ITERATIONS = "BubbleMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
+ private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
+
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+ private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsg = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
+ private ArrayList<MergeBubbleMessageWritable> tmpMsg = new ArrayList<MergeBubbleMessageWritable>();
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ outgoingMsg.reset();
+ }
+ /**
+ * get destination vertex
+ */
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ return posIterator.next();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ @Override
+ public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1) {
+ if(VertexUtil.isHeadVertex(getVertexValue())){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsgToAllNextNodes(getVertexValue());
+ }
+ } else if (getSuperstep() == 2){
+ while (msgIterator.hasNext()) {
+ if(VertexUtil.isPathVertex(getVertexValue())){
+ outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ } else if (getSuperstep() == 3){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(!receivedMsg.containsKey(incomingMsg.getStartVertexId())){
+ tmpMsg.clear();
+ tmpMsg.add(incomingMsg);
+ receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+ }
+ else{
+ tmpMsg.clear();
+ tmpMsg.addAll(receivedMsg.get(incomingMsg.getStartVertexId()));
+ tmpMsg.add(incomingMsg);
+ receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+ }
+ }
+ for(PositionWritable prevId : receivedMsg.keySet()){
+ tmpMsg = receivedMsg.get(prevId);
+ if(tmpMsg.size() > 1){
+ //find the node with largest length of mergeChain
+ boolean flag = true; //the same length
+ int maxLength = tmpMsg.get(0).getLengthOfChain();
+ PositionWritable max = tmpMsg.get(0).getSourceVertexId();
+ for(int i = 1; i < tmpMsg.size(); i++){
+ if(tmpMsg.get(i).getLengthOfChain() != maxLength)
+ flag = false;
+ if(tmpMsg.get(i).getLengthOfChain() > maxLength){
+ maxLength = tmpMsg.get(i).getLengthOfChain();
+ max = tmpMsg.get(i).getSourceVertexId();
+ }
+ }
+ //send merge or unchange Message to node with largest length
+ if(flag == true){
+ //send unchange Message to node with largest length
+ //we can send no message to complete this step
+ //send delete Message to node which doesn't have largest length
+ for(int i = 0; i < tmpMsg.size(); i++){
+ if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+ outgoingMsg.setMessage(AdjMessage.KILL);
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ } else {
+ outgoingMsg.setMessage(AdjMessage.UNCHANGE);
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ }
+ }
+ } else{
+ //send merge Message to node with largest length
+ for(int i = 0; i < tmpMsg.size(); i++){
+ if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+ outgoingMsg.setMessage(AdjMessage.KILL);
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ } else {
+ outgoingMsg.setMessage(AdjMessage.MERGE);
+ /* add other node in message */
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ }
+ }
+ }
+ }
+ }
+ } else if (getSuperstep() == 4){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.KILL){
+ deleteVertex(getVertexId());
+ } else if (incomingMsg.getMessage() == AdjMessage.MERGE){
+ //merge with small node
+ }
+ }
+ }
+ voteToHalt();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(BubbleMergeVertex.class.getSimpleName());
+ job.setVertexClass(BubbleMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(DataCleanInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
similarity index 99%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index c3bb663..84c7f52 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.operator;
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
import java.util.Iterator;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
similarity index 99%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index 4fcc09e..722206a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.operator;
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
similarity index 99%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index 8a03aa7..5ba5f31 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.operator;
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
new file mode 100644
index 0000000..4a174ec
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -0,0 +1,128 @@
+package edu.uci.ics.genomix.pregelix.operator.tipremove;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Remove tip or single node when l > constant
+ */
+public class TipRemoveVertex extends
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "TipRemoveVertex.kmerSize";
+ public static final String LENGTH = "TipRemoveVertex.length";
+ public static int kmerSize = -1;
+ private int length = -1;
+
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
+
+ /**
+ * initiate kmerSize, length
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if(length == -1)
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
+ outgoingMsg.reset();
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if(getSuperstep() == 1){
+ if(VertexUtil.isIncomingTipVertex(getVertexValue())){
+ if(getVertexValue().getLengthOfMergeChain() > length){
+ if(getVertexValue().getFFList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ deleteVertex(getVertexId());
+ }
+ }
+ else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
+ if(getVertexValue().getLengthOfMergeChain() > length){
+ if(getVertexValue().getRFList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ deleteVertex(getVertexId());
+ }
+ }
+ else if(VertexUtil.isSingleVertex(getVertexValue())){
+ if(getVertexValue().getLengthOfMergeChain() > length)
+ deleteVertex(getVertexId());
+ }
+ }
+ else if(getSuperstep() == 2){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ //remove incomingMsg.getSourceId from RR positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ //remove incomingMsg.getSourceId from RF positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ //remove incomingMsg.getSourceId from FR positionList
+ } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ //remove incomingMsg.getSourceId from FF positionList
+ }
+ }
+ }
+ voteToHalt();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(TipRemoveVertex.class.getSimpleName());
+ job.setVertexClass(TipRemoveVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(DataCleanInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
index b81491b..452f72d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -13,7 +13,6 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
new file mode 100644
index 0000000..ca8d795
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.pregelix.type;
+
+public class AdjMessage {
+ public static final byte FROMFF = 0;
+ public static final byte FROMFR = 1;
+ public static final byte FROMRF = 2;
+ public static final byte FROMRR = 3;
+ public static final byte NON = 4;
+ public static final byte UNCHANGE = 5;
+ public static final byte MERGE = 6;
+ public static final byte KILL = 7;
+
+ public final static class ADJMESSAGE_CONTENT {
+ public static String getContentFromCode(byte code) {
+ String r = "";
+ switch (code) {
+ case FROMFF:
+ r = "FROMFF";
+ break;
+ case FROMFR:
+ r = "FROMFR";
+ break;
+ case FROMRF:
+ r = "FROMRF";
+ break;
+ case FROMRR:
+ r = "FROMRR";
+ break;
+ case NON:
+ r = "NON";
+ break;
+ case UNCHANGE:
+ r = "UNCHANGE";
+ break;
+ case MERGE:
+ r = "MERGE";
+ break;
+ case KILL:
+ r = "KILL";
+ break;
+ }
+ return r;
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 6e6a97a..c7bcf48 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -8,6 +8,7 @@
public static final byte MESSAGE = 1 << 3;
public static final byte STATE = 1 << 4;
public static final byte LASTGENECODE = 1 << 5;
+ public static final byte START = 1 << 6;
public final static class CheckMessage_CONTENT {
@@ -32,6 +33,9 @@
case LASTGENECODE:
r = "LASTGENECODE";
break;
+ case START:
+ r = "START";
+ break;
}
return r;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index fa5f73b..4644383 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -8,7 +8,8 @@
public static final byte STOP = 3;
public static final byte FROMPSEUDOHEAD = 4;
public static final byte FROMPSEUDOREAR = 5;
- public static final byte FROMSELF = 6;
+ public static final byte IN = 6;
+ public static final byte OUT = 7;
public final static class MESSAGE_CONTENT {
@@ -33,8 +34,11 @@
case FROMPSEUDOREAR:
r = "FROMPSEUDOREAR";
break;
- case FROMSELF:
- r = "FROMSELF";
+ case IN:
+ r = "IN";
+ break;
+ case OUT:
+ r = "OUT";
break;
}
return r;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 1740744..772690d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -47,4 +47,36 @@
}
return false;*/
}
+
+ /**
+ * check if vertex is a tip
+ */
+ public static boolean isIncomingTipVertex(ValueStateWritable value){
+ return value.inDegree() == 0 && value.outDegree() == 1;
+ }
+
+ public static boolean isOutgoingTipVertex(ValueStateWritable value){
+ return value.inDegree() == 1 && value.outDegree() == 0;
+ }
+
+ /**
+ * check if vertex is single
+ */
+ public static boolean isSingleVertex(ValueStateWritable value){
+ return value.inDegree() == 0 && value.outDegree() == 0;
+ }
+
+ /**
+ * check if vertex is upbridge
+ */
+ public static boolean isUpBridgeVertex(ValueStateWritable value){
+ return value.inDegree() == 1 && value.outDegree() > 1;
+ }
+
+ /**
+ * check if vertex is downbridge
+ */
+ public static boolean isDownBridgeVertex(ValueStateWritable value){
+ return value.inDegree() > 1 && value.outDegree() == 1;
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index cdc97c1..16b2794 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -9,9 +9,9 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;