Merge branch 'wbiesing/genomix/VKmers' into genomix/fullstack_genomix
Conflicts:
genomix/genomix-
data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
genomix/genomix-
data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
index aa33350..abd46ef 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
@@ -225,7 +225,7 @@
* remove the first instance of `toRemove`. Uses a linear scan. Throws an
* exception if not in this list.
*/
- public void remove(KmerBytesWritable toRemove, boolean ignoreMissing) {
+ public void remove(VKmerBytesWritable toRemove, boolean ignoreMissing) {
Iterator<VKmerBytesWritable> posIterator = this.iterator();
while (posIterator.hasNext()) {
if (toRemove.equals(posIterator.next())) {
@@ -240,7 +240,7 @@
}
}
- public void remove(KmerBytesWritable toRemove) {
+ public void remove(VKmerBytesWritable toRemove) {
remove(toRemove, false);
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 81565a3..4f7b90e 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -15,11 +15,6 @@
package edu.uci.ics.genomix.data.test;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
import junit.framework.Assert;
import org.junit.Test;
@@ -123,5 +118,5 @@
}
Assert.assertEquals(kmer.toString(), kmerAppend.toString());
}
- }
+ }
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
index 5dd4f82..2493b7e 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
@@ -15,6 +15,11 @@
package edu.uci.ics.genomix.data.test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import junit.framework.Assert;
import org.junit.Test;
@@ -22,6 +27,8 @@
import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
+
public class VKmerBytesWritableTest {
static byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
@@ -252,6 +259,46 @@
Assert.assertEquals("TCTA", k3.toString());
// Assert.assertEquals("CTAT", k3); // this is an incorrect test case--
// the merge always flips the passed-in kmer
+
+ String test1;
+ String test2;
+ test1 = "CTA";
+ test2 = "AGA";
+ VKmerBytesWritable k1 = new VKmerBytesWritable(3);
+ VKmerBytesWritable k2 = new VKmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("TCTA", k1.toString());
+
+
+
+ test1 = "CTA";
+ test2 = "ATA"; //TAT
+ k1 = new VKmerBytesWritable(3);
+ k2 = new VKmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("CTAT", k1.toString());
+
+ test1 = "ATA";
+ test2 = "CTA"; //TAT
+ k1 = new VKmerBytesWritable(3);
+ k2 = new VKmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("ATAG", k1.toString());
+
+ test1 = "TCTAT";
+ test2 = "GAAC";
+ k1 = new VKmerBytesWritable(5);
+ k2 = new VKmerBytesWritable(4);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("GTTCTAT", k1.toString());
}
@Test
@@ -396,4 +443,116 @@
k1.mergeWithRFKmer(5, k3);
Assert.assertEquals("GCTAGAT", k1.toString());
}
+
+ @Test
+ public void TestFinalMerge() {
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ int kmerSize = 3;
+
+ String F1 = "AATAG";
+ String F2 = "TAGAA";
+ String R1 = "CTATT";
+ String R2 = "TTCTA";
+
+ //FF test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = F2;
+ index = msgString.indexOf(match);
+ // does this test belong in VKmer so it can have variable-length kmers?
+// kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //FR test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(R2);
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RF test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(F2);
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RR test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = R2;
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ String[][] connectedTable = new String[][]{
+ {"FF", "RF"},
+ {"FF", "RR"},
+ {"FR", "RF"},
+ {"FR", "RR"}
+ };
+ System.out.println(connectedTable[0][1]);
+
+ Set<Long> s1 = new HashSet<Long>();
+ Set<Long> s2 = new HashSet<Long>();
+ s1.add((long) 1);
+ s1.add((long) 2);
+ s2.add((long) 2);
+ s2.add((long) 3);
+ Set<Long> intersection = new HashSet<Long>();
+ intersection.addAll(s1);
+ intersection.retainAll(s2);
+ System.out.println(intersection.toString());
+ Set<Long> difference = new HashSet<Long>();
+ difference.addAll(s1);
+ difference.removeAll(s2);
+ System.out.println(difference.toString());
+
+ Map<VKmerBytesWritable, Set<Long>> map = new HashMap<VKmerBytesWritable, Set<Long>>();
+ VKmerBytesWritable k1 = new VKmerBytesWritable(3);
+ Set<Long> set1 = new HashSet<Long>();
+ k1.setByRead(("CTA").getBytes(), 0);
+ set1.add((long)1);
+ map.put(k1, set1);
+ VKmerBytesWritable k2 = new VKmerBytesWritable(3);
+ k2.setByRead(("GTA").getBytes(), 0);
+ Set<Long> set2 = new HashSet<Long>();
+ set2.add((long) 2);
+ map.put(k2, set2);
+ VKmerBytesWritable k3 = new VKmerBytesWritable(3);
+ k3.setByRead(("ATG").getBytes(), 0);
+ Set<Long> set3 = new HashSet<Long>();
+ set3.add((long) 2);
+ map.put(k3, set3);
+ VKmerBytesWritable k4 = new VKmerBytesWritable(3);
+ k4.setByRead(("AAT").getBytes(), 0);
+ Set<Long> set4 = new HashSet<Long>();
+ set4.add((long) 1);
+ map.put(k4, set4);
+ VKmerListWritable kmerList = new VKmerListWritable();
+ kmerList.append(k1);
+ kmerList.append(k2);
+ System.out.println("CTA = " + map.get(k1).toString());
+ System.out.println("GTA = " + map.get(k2).toString());
+ System.out.println("ATG = " + map.get(k3).toString());
+ System.out.println("AAT = " + map.get(k4).toString());
+ System.out.println(k1.compareTo(k2));
+ System.out.println(k2.compareTo(k1));
+
+ System.out.println("CTA = " + kmerList.getPosition(0).toString());
+ System.out.println("GTA = " + kmerList.getPosition(1).toString());
+ System.out.println("CTA = " + map.get(kmerList.getPosition(0)).toString());
+ System.out.println("GTA = " + map.get(kmerList.getPosition(1)).toString());
+ }
+
}
diff --git a/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
new file mode 100644
index 0000000..f2e3942
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
@@ -0,0 +1,3 @@
+1 AATAG
+2 GCATA
+3 ATAGC
diff --git a/genomix/genomix-hadoop/data/webmap/MergeBubble.txt b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
new file mode 100644
index 0000000..087f43e
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
@@ -0,0 +1,2 @@
+1 AATAGAA
+2 AATACAA
diff --git a/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt b/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt
new file mode 100644
index 0000000..472a7dc
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CACGC
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index f39cdcb..a0eb7c8 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -37,7 +37,7 @@
while (values.hasNext()) {
tmpNode.set(values.next());
outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
- outputNode.getFFList().appendList(tmpNode.getFFList());
+ outputNode.getFFList().appendList(tmpNode.getFFList()); //appendList need to check if insert node exists
outputNode.getFRList().appendList(tmpNode.getFRList());
outputNode.getRFList().appendList(tmpNode.getRFList());
outputNode.getRRList().appendList(tmpNode.getRRList());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 127ab3e..cc7e0ac 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,7 +22,7 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
+ private static final String DATA_PATH = "data/webmap/RemoveBridge.txt";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
diff --git a/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000 b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
new file mode 100755
index 0000000..bc255a4
--- /dev/null
+++ b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
new file mode 100755
index 0000000..a983577
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
new file mode 100755
index 0000000..a187c64
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
index e8a72ce..859ddba 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
@@ -66,6 +66,7 @@
* set the vertex value
*/
vertexValue.set(getRecordReader().getCurrentValue());
+ vertexValue.setKmerlength(getRecordReader().getCurrentValue().getKmerlength());
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index 09e98fa..44fa444 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -6,7 +6,6 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index 9fd15dd..5702146 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -41,7 +41,7 @@
this.sourceVertexId.set(msg.getSourceVertexId());
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
+ checkMessage |= CheckMessage.ACUTUALKMER;
this.chainVertexId.setAsCopy(msg.getChainVertexId());
}
if (neighberNode != null) {
@@ -62,7 +62,7 @@
this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
+ checkMessage |= CheckMessage.ACUTUALKMER;
this.chainVertexId.setAsCopy(chainVertexId);
}
if (neighberNode != null) {
@@ -100,7 +100,7 @@
public void setChainVertexId(KmerBytesWritable chainVertexId) {
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
+ checkMessage |= CheckMessage.ACUTUALKMER;
this.chainVertexId.setAsCopy(chainVertexId);
}
}
@@ -144,7 +144,7 @@
out.writeByte(checkMessage);
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
chainVertexId.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
@@ -159,7 +159,7 @@
checkMessage = in.readByte();
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
chainVertexId.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index e3cd345..80ce45a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -8,9 +8,9 @@
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
public class MessageWritable implements WritableComparable<MessageWritable> {
/**
@@ -18,7 +18,7 @@
* stores neighber vertexValue when pathVertex sends the message
* file stores the point to the file that stores the chains of connected DNA
*/
- private KmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable sourceVertexId;
private VKmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private PositionListWritable nodeIdList = new PositionListWritable();
@@ -26,13 +26,15 @@
private boolean isFlip;
private int kmerlength = 0;
private boolean updateMsg = false;
-
+ private VKmerBytesWritable startVertexId;
+
private byte checkMessage;
public MessageWritable() {
- sourceVertexId = new KmerBytesWritable();
+ sourceVertexId = new VKmerBytesWritable();
kmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable();
+ startVertexId = new VKmerBytesWritable();
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -40,9 +42,10 @@
public MessageWritable(int kmerSize) {
kmerlength = kmerSize;
- sourceVertexId = new KmerBytesWritable();
- kmer = new VKmerBytesWritable();
+ sourceVertexId = new VKmerBytesWritable(kmerSize);
+ kmer = new VKmerBytesWritable(0);
neighberNode = new AdjacencyListWritable(kmerSize);
+ startVertexId = new VKmerBytesWritable(kmerSize);
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -56,19 +59,23 @@
this.sourceVertexId.setAsCopy(msg.getSourceVertexId());
}
if (kmer != null) {
- checkMessage |= CheckMessage.CHAIN;
+ checkMessage |= CheckMessage.ACUTUALKMER;
this.kmer.setAsCopy(msg.getActualKmer());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(msg.getNeighberNode());
}
+ if (startVertexId != null) {
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.setAsCopy(msg.getStartVertexId());
+ }
checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
updateMsg = msg.isUpdateMsg();
}
- public void set(int kmerlength, KmerBytesWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+ public void set(int kmerlength, VKmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
this.kmerlength = kmerlength;
checkMessage = 0;
if (sourceVertexId != null) {
@@ -76,8 +83,8 @@
this.sourceVertexId.setAsCopy(sourceVertexId);
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(chainVertexId.toString())); // TODO Vkmer
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.setAsCopy(chainVertexId);
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -95,15 +102,16 @@
kmerlength = kmerSize;
// kmer.reset();
neighberNode.reset(kmerSize);
+ startVertexId.reset(kmerSize);
flag = Message.NON;
isFlip = false;
}
- public KmerBytesWritable getSourceVertexId() {
+ public VKmerBytesWritable getSourceVertexId() {
return sourceVertexId;
}
- public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
+ public void setSourceVertexId(VKmerBytesWritable sourceVertexId) {
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
this.sourceVertexId.setAsCopy(sourceVertexId);
@@ -116,8 +124,8 @@
public void setActualKmer(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.setAsCopy(actualKmer);
}
}
@@ -125,10 +133,10 @@
return kmer;
}
- public void setCreatedVertexId(KmerBytesWritable actualKmer) {
+ public void setCreatedVertexId(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.setAsCopy(actualKmer);
}
}
@@ -143,6 +151,17 @@
}
}
+ public VKmerBytesWritable getStartVertexId() {
+ return startVertexId;
+ }
+
+ public void setStartVertexId(VKmerBytesWritable startVertexId) {
+ if(startVertexId != null){
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.setAsCopy(startVertexId);
+ }
+ }
+
public int getLengthOfChain() {
return kmer.getKmerLetterLength();
}
@@ -189,12 +208,14 @@
out.writeByte(checkMessage);
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
nodeIdList.write(out);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
out.writeBoolean(updateMsg);
@@ -207,12 +228,14 @@
checkMessage = in.readByte();
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
nodeIdList.readFields(in);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
updateMsg = in.readBoolean();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 83f286c..e78f090 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -6,24 +6,22 @@
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
- public static class State extends VertexStateFlag{
- public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b101 << 0;
- public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b111 << 0;
-
+ public static class State extends VertexStateFlag{
public static final byte NO_MERGE = 0b00 << 3;
public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
- public static final byte KILL = 0b11 << 3;
- public static final byte KILL_MASK = 0b11 << 3;
+ public static final byte KILL = 0b1 << 3;
+ public static final byte KILL_MASK = 0b1 << 3;
+
+ public static final byte DIR_FROM_DEADVERTEX = 0b10 << 3;
}
public static class VertexStateFlag extends FakeFlag {
@@ -252,7 +250,7 @@
/*
* Delete the corresponding edge
*/
- public void processDelete(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete){
+ public void processDelete(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete);
@@ -272,8 +270,8 @@
/*
* Process any changes to value. This is for edge updates
*/
- public void processUpdates(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
- byte neighborToMergeDir, KmerBytesWritable nodeToAdd){
+ public void processUpdates(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
+ byte neighborToMergeDir, VKmerBytesWritable nodeToAdd){
// TODO
// this.getListFromDir(neighborToDeleteDir).remove(nodeToDelete);
// this.getListFromDir(neighborToMergeDir).append(nodeToDelete);
@@ -311,9 +309,9 @@
/*
* Process any changes to value. This is for merging
*/
- public void processMerges(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
- byte neighborToMergeDir, KmerBytesWritable nodeToAdd,
- int kmerSize, KmerBytesWritable kmer){
+ public void processMerges(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
+ byte neighborToMergeDir, VKmerBytesWritable nodeToAdd,
+ int kmerSize, VKmerBytesWritable kmer){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete); //set(null);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 89b66e6..5cb6aac 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -3,18 +3,17 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -48,8 +47,8 @@
* Naive Algorithm for path merge graph
*/
public class BridgeAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize"; // TODO consolidate config options
public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
private int length = -1;
@@ -58,10 +57,12 @@
* initiate kmerSize, maxIteration
*/
public void initVertex() {
- if (kmerSize == -1)
+ if (kmerSize == -1) {
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if(length == -1)
- length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
+ }
+ if (length == -1)
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5); // TODO fail on parse
}
@SuppressWarnings("unchecked")
@@ -69,40 +70,42 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(3, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ VKmerBytesWritable vertexId = new VKmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
//add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(3, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C'};
- VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(2, (byte)2));
- vertexValue.setFFList(plist2);
+ VKmerListWritable kmerFRList = new VKmerListWritable();
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ VKmerBytesWritable otherVertexId = new VKmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("ACG".getBytes(), 0);
+ VKmerListWritable kmerRFList = new VKmerListWritable();
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
+ }
+ else if(getVertexId().toString().equals("ACG")){
+ VKmerBytesWritable brdgeVertexId = new VKmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getRFList().append(brdgeVertexId);
}
- if(getVertexId().getReadID() == 2 && getVertexId().getPosInRead() == 2)
- getVertexValue().getRRList().append(3, (byte)1);
}
voteToHalt();
}
@@ -116,7 +119,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 7f85fc7..039f5b4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -11,8 +11,8 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -59,76 +59,17 @@
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if(length == -1)
- length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
- outgoingMsg.reset();
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable(kmerSize);
receivedMsgList.clear();
}
-
- /**
- * broadcast kill self to all neighbers
- */
- public void broadcaseKillself(){
- outgoingMsg.setSourceVertexId(getVertexId());
- if(receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } // RR
- else if(receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else
- voteToHalt();
- }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
@@ -150,13 +91,17 @@
i++;
}
if(receivedMsgList.size() == 2){
- if(getVertexValue().getLengthOfKmer() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length
+ && getVertexValue().getDegree() == 2){
broadcaseKillself();
}
}
}
else if(getSuperstep() == 3){
- responseToDeadVertex(msgIterator);
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index c0ba1a9..3df090c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -3,13 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -48,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class BubbleAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BubbleAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -65,40 +63,42 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(2, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ VKmerBytesWritable vertexId = new VKmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
- //add tip vertex
+ //add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(2, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
- VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(1, (byte)4));
- vertexValue.setFFList(plist2);
+ VKmerListWritable kmerFRList = new VKmerListWritable();
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ VKmerBytesWritable otherVertexId = new VKmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("AGA".getBytes(), 0);
+ VKmerListWritable kmerRFList = new VKmerListWritable();
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
+ }
+ else if(getVertexId().toString().equals("AGA")){
+ VKmerBytesWritable brdgeVertexId = new VKmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getRFList().append(brdgeVertexId);
}
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4)
- getVertexValue().getRRList().append(2, (byte)1);
}
voteToHalt();
}
@@ -112,7 +112,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index b782294..4cdeb89 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -7,66 +7,31 @@
import org.apache.hadoop.io.NullWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
/**
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
- public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
- public static final String ITERATIONS = "BubbleMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
+ BasicGraphCleanVertex {
- private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
- private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private Iterator<PositionWritable> iterator;
- private PositionWritable pos = new PositionWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
- private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsgMap = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
- private ArrayList<MergeBubbleMessageWritable> receivedMsgList = new ArrayList<MergeBubbleMessageWritable>();
+ private Map<VKmerBytesWritable, ArrayList<MessageWritable>> receivedMsgMap = new HashMap<VKmerBytesWritable, ArrayList<MessageWritable>>();
+ private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
/**
* initiate kmerSize, maxIteration
@@ -78,189 +43,40 @@
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
outgoingMsg.reset();
}
- /**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPrevDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getRFList().iterator();
- else // #FRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * check if prev/next destination exists
- */
- public boolean hasNextDest(VertexValueWritable value){
- return value.getFFList().getCountOfPosition() > 0 || value.getFRList().getCountOfPosition() > 0;
- }
-
- public boolean hasPrevDest(VertexValueWritable value){
- return value.getRFList().getCountOfPosition() > 0 || value.getRRList().getCountOfPosition() > 0;
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllPrevNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * broadcast kill self to all neighbers Pre-condition: vertex is a path vertex
- */
- public void broadcaseKillself(){
- outgoingMsg.setSourceVertexId(getVertexId());
-
- if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
- else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
-
-
- if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
- }
- else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
- }
-
- deleteVertex(getVertexId());
- }
-
- /**
- * do some remove operations on adjMap after receiving the info about dead Vertex
- */
- public void responseToDeadVertex(Iterator<MergeBubbleMessageWritable> msgIterator){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
- //remove incomingMsg.getSourceId from RR positionList
- iterator = getVertexValue().getRRList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
- //remove incomingMsg.getSourceId from RF positionList
- iterator = getVertexValue().getFRList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
- //remove incomingMsg.getSourceId from FR positionList
- iterator = getVertexValue().getRFList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
- //remove incomingMsg.getSourceId from FF positionList
- iterator = getVertexValue().getFFList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- }
- }
- }
@SuppressWarnings("unchecked")
@Override
- public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
if(VertexUtil.isHeadVertexWithIndegree(getVertexValue())
|| VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsgToAllNextNodes(getVertexValue());
+ sendSettledMsgToAllNextNodes(getVertexValue());
}
-// if(VertexUtil.isRearVertexWithOutdegree(getVertexValue())
-// || VertexUtil.isRearWithoutOutdegree(getVertexValue())){
-// outgoingMsg.setSourceVertexId(getVertexId());
-// sendMsgToAllPrevNodes(getVertexValue());
-// }
} else if (getSuperstep() == 2){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(VertexUtil.isPathVertex(getVertexValue())){
- switch(incomingMsg.getMessage()){
- case AdjMessage.FROMFF:
- case AdjMessage.FROMRF:
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
if(hasNextDest(getVertexValue())){
- outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
break;
- case AdjMessage.FROMFR:
- case AdjMessage.FROMRR:
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
if(hasPrevDest(getVertexValue())){
- outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
- destVertexId.set(getPrevDestVertexId(getVertexValue()));
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ destVertexId.setAsCopy(getPrevDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
break;
@@ -273,32 +89,23 @@
if(!receivedMsgMap.containsKey(incomingMsg.getStartVertexId())){
receivedMsgList.clear();
receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
}
else{
receivedMsgList.clear();
receivedMsgList.addAll(receivedMsgMap.get(incomingMsg.getStartVertexId()));
receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
}
}
- for(PositionWritable prevId : receivedMsgMap.keySet()){
+ for(VKmerBytesWritable prevId : receivedMsgMap.keySet()){
receivedMsgList = receivedMsgMap.get(prevId);
if(receivedMsgList.size() > 1){
- //find the node with largest length of Kmer
- boolean flag = true; //the same length
- int maxLength = receivedMsgList.get(0).getLengthOfChain();
- PositionWritable max = receivedMsgList.get(0).getSourceVertexId();
- PositionWritable secondMax = receivedMsgList.get(0).getSourceVertexId();
- for(int i = 1; i < receivedMsgList.size(); i++){
- if(receivedMsgList.get(i).getLengthOfChain() != maxLength)
- flag = false;
- if(receivedMsgList.get(i).getLengthOfChain() >= maxLength){
- maxLength = receivedMsgList.get(i).getLengthOfChain();
- secondMax.set(max);
- max = receivedMsgList.get(i).getSourceVertexId();
- }
- }
+ /** for each startVertex, sort the node by decreasing order of coverage **/
+
+ /** process similarSet, keep the unchanged set and deleted set & add coverage to unchange node **/
+
+ /** send message to the unchanged set for updating coverage & send kill message to the deleted set **/
//send unchange or merge Message to node with largest length
if(flag == true){
//1. send unchange Message to node with largest length
@@ -366,7 +173,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 64965e3..ca4b088 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -5,16 +5,13 @@
import org.apache.hadoop.io.NullWritable;
-
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.GeneCode;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
/**
@@ -29,9 +26,9 @@
protected MessageWritable incomingMsg = null;
protected MessageWritable outgoingMsg = null;
- protected VKmerBytesWritable destVertexId = new VKmerBytesWritable();
- protected Iterator<VKmerBytesWritable> posIterator;
- protected VKmerBytesWritable tmpKmer = new VKmerBytesWritable();
+ protected VKmerBytesWritable destVertexId = null;
+ protected Iterator<VKmerBytesWritable> kmerIterator;
+ protected VKmerBytesWritable tmpKmer = new VKmerBytesWritable(kmerSize);
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -69,24 +66,24 @@
* get destination vertex
*/
public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
- if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
- return posIterator.next();
- } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
+ if (value.getFFList().getCountOfPosition() > 0){ //#FFList() > 0
+ kmerIterator = value.getFFList().iterator();
+ return kmerIterator.next();
+ } else if (value.getFRList().getCountOfPosition() > 0){ //#FRList() > 0
+ kmerIterator = value.getFRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
}
- public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
- if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
- return posIterator.next();
- } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
+ public VKmerBytesWritable getPrevDestVertexId(VertexValueWritable value) {
+ if (value.getRFList().getCountOfPosition() > 0){ //#RFList() > 0
+ kmerIterator = value.getRFList().iterator();
+ return kmerIterator.next();
+ } else if (value.getRRList().getCountOfPosition() > 0){ //#RRList() > 0
+ kmerIterator = value.getRRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
@@ -95,67 +92,67 @@
/**
* get destination vertex
*/
- public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
+ kmerIterator = value.getFFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
+ kmerIterator = value.getFRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
}
- public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getPrevDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
+ kmerIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
+ kmerIterator = value.getRRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
}
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
+
/**
* head send message to all previous nodes
*/
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
+ destVertexId.setAsCopy(kmerIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
+ destVertexId.setAsCopy(kmerIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -169,21 +166,49 @@
}
/**
+ * tip send message with sourceId and dir to previous node
+ * tip only has one incoming
+ */
+ public void sendSettledMsgToPreviousNode(){
+ if(getVertexValue().getFFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
+ * tip send message with sourceId and dir to next node
+ * tip only has one outgoing
+ */
+ public void sendSettledMsgToNextNode(){
+ if(getVertexValue().getRFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.setAsCopy(getPrevDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
* head send message to all previous nodes
*/
public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRF);
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -192,18 +217,18 @@
* head send message to all next nodes
*/
public void sendSettledMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFF);
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFR);
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -325,8 +350,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- if(getPreDestVertexId(getVertexValue()) != null)
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ if(getPrevDestVertexId(getVertexValue()) != null)
+ sendMsg(getPrevDestVertexId(getVertexValue()), outgoingMsg);
break;
}
}
@@ -469,7 +494,7 @@
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setActualKmer(getVertexValue().getKmer());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ sendMsg(getPrevDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
}
@@ -748,44 +773,44 @@
switch(incomingMsg.getFlag() & MessageFlag.DIR_MASK){
case MessageFlag.DIR_FF:
//remove incomingMsg.getSourceId from RR positionList
- posIterator = getVertexValue().getRRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_FR:
//remove incomingMsg.getSourceId from FR positionList
- posIterator = getVertexValue().getFRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RF:
//remove incomingMsg.getSourceId from RF positionList
- posIterator = getVertexValue().getRFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RR:
//remove incomingMsg.getSourceId from FF positionList
- posIterator = getVertexValue().getFFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index 99cd3c2..027d295 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -12,7 +12,6 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -51,10 +50,12 @@
kmerList.reset();
if(fakeVertex == null){
// fakeVertex = new KmerBytesWritable(kmerSize + 1); // TODO check if merge is correct
- fakeVertex = new KmerBytesWritable();
+ fakeVertex = new VKmerBytesWritable();
String random = generaterRandomString(kmerSize + 1);
fakeVertex.setByRead(random.getBytes(), 0);
}
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
/**
@@ -187,7 +188,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 3447f25..f6a005c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -3,12 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
@@ -50,7 +49,7 @@
* Naive Algorithm for path merge graph
*/
public class P1ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P1ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P1ForPathMergeVertex.iteration";
public static int kmerSize = -1;
@@ -62,8 +61,8 @@
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
+ private Iterator<VKmerBytesWritable> posIterator;
/**
* initiate kmerSize, maxIteration
@@ -79,7 +78,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -87,7 +86,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -101,12 +100,12 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -117,12 +116,12 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -196,7 +195,7 @@
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
@@ -204,7 +203,7 @@
if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
mergeChainVertex();
@@ -222,7 +221,7 @@
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State.IS_HEAD)//is_tail
outgoingMsg.setFlag(Message.STOP);
destVertexId.setAsCopy(incomingMsg.getSourceVertexId());
@@ -259,7 +258,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index c9942a2..da83dcf 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -12,7 +12,6 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
@@ -47,7 +46,7 @@
MapReduceVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
- KmerBytesWritable tmpKmer = new KmerBytesWritable();
+ VKmerBytesWritable tmpKmer = new VKmerBytesWritable();
private boolean isFakeVertex = false;
/**
@@ -80,6 +79,8 @@
fakeVertex.setByRead(random.getBytes(), 0);
}
isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
/**
@@ -96,7 +97,7 @@
}
//send wantToMerge to prev
- tmpKmer = getPreDestVertexIdAndSetFlag(getVertexValue());
+ tmpKmer = getPrevDestVertexIdAndSetFlag(getVertexValue());
if(tmpKmer != null){
destVertexId.setAsCopy(tmpKmer);
outgoingMsg.setFlag(outFlag);
@@ -266,7 +267,7 @@
mapKeyByActualKmer(msgIterator);
/** Reducer **/
reduceKeyByActualKmer();
- voteToHalt();
+ voteToHalt();
}
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
if(!isFakeVertex){
@@ -298,7 +299,7 @@
*/
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index cf35c7a..184a7b2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -3,12 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
@@ -50,7 +49,7 @@
* Naive Algorithm for path merge graph
*/
public class P3ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
@@ -66,8 +65,8 @@
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
+ private Iterator<VKmerBytesWritable> posIterator;
/**
* initiate kmerSize, maxIteration
*/
@@ -86,7 +85,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -94,7 +93,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -108,12 +107,12 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -124,12 +123,12 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -206,8 +205,7 @@
public void markPseudoHead() {
getVertexValue().setState(State2.PSEUDOHEAD);
outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
- destVertexId
- .set(getPreDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -244,7 +242,7 @@
public void sendMsgToPathVertexMergePhase(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3 + 2 * maxRound + 2) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
@@ -252,8 +250,7 @@
if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId
- .set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
mergeChainVertex();
@@ -271,7 +268,7 @@
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State2.END_VERTEX)
outgoingMsg.setFlag(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -284,7 +281,7 @@
if (getSuperstep() == 4) {
if(getVertexValue().getState() != State2.START_HALT){
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
}
@@ -297,7 +294,7 @@
if (incomingMsg.getFlag() != Message.STOP
&& incomingMsg.getFlag() != Message.FROMPSEUDOREAR) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
} else {
@@ -325,7 +322,7 @@
else {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
- outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State2.PSEUDOREAR)
outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State2.END_VERTEX)
@@ -410,7 +407,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index ecfafa7..28d0563 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -5,7 +5,6 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
@@ -14,7 +13,7 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -56,9 +55,9 @@
private float probBeingRandomHead = -1;
private Random randGenerator;
- private KmerBytesWritable curKmer = new KmerBytesWritable();
- private KmerBytesWritable nextKmer = new KmerBytesWritable();
- private KmerBytesWritable prevKmer = new KmerBytesWritable();
+ private VKmerBytesWritable curKmer = new VKmerBytesWritable();
+ private VKmerBytesWritable nextKmer = new VKmerBytesWritable();
+ private VKmerBytesWritable prevKmer = new VKmerBytesWritable();
private boolean hasNext;
private boolean hasPrev;
private boolean curHead;
@@ -80,6 +79,8 @@
outgoingMsg = new MessageWritable(kmerSize);
else
outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable(kmerSize);
randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
@@ -95,7 +96,7 @@
headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
}
- protected boolean isNodeRandomHead(KmerBytesWritable nodeKmer) {
+ protected boolean isNodeRandomHead(VKmerBytesWritable nodeKmer) {
// "deterministically random", based on node id
//randGenerator.setSeed(randSeed);
//randSeed = randGenerator.nextInt();
@@ -232,7 +233,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 3f91ac1..154fbda 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -9,7 +9,7 @@
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
@@ -52,7 +52,7 @@
* Naive Algorithm for path merge graph
*/
public class P5ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P5ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P5ForPathMergeVertex.iteration";
public static final String RANDSEED = "P5ForPathMergeVertex.randSeed";
@@ -64,9 +64,9 @@
private float probBeingRandomHead = -1;
private Random randGenerator;
- private PositionWritable curID = new PositionWritable();
- private PositionWritable nextID = new PositionWritable();
- private PositionWritable prevID = new PositionWritable();
+ private VKmerBytesWritable curID = new VKmerBytesWritable();
+ private VKmerBytesWritable nextID = new VKmerBytesWritable();
+ private VKmerBytesWritable prevID = new VKmerBytesWritable();
private boolean hasNext;
private boolean hasPrev;
private boolean curHead;
@@ -78,8 +78,8 @@
private MessageWritable incomingMsg = new MessageWritable();
private MessageWritable outgoingMsg = new MessageWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
+ private Iterator<VKmerBytesWritable> posIterator;
/**
* initiate kmerSize, maxIteration
@@ -102,7 +102,7 @@
outgoingMsg.reset();
}
- protected boolean isNodeRandomHead(PositionWritable nodeID) {
+ protected boolean isNodeRandomHead(VKmerBytesWritable nodeID) {
// "deterministically random", based on node id
randGenerator.setSeed(randSeed ^ nodeID.hashCode());
return randGenerator.nextFloat() < probBeingRandomHead;
@@ -113,7 +113,7 @@
*/
protected boolean setNextInfo(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0) {
- nextID.set(value.getFFList().getPosition(0));
+ nextID.setAsCopy(value.getFFList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
return true;
}
@@ -145,7 +145,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -153,7 +153,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -167,12 +167,12 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -183,12 +183,12 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -437,7 +437,7 @@
outFlag = (byte) (headFlag | tailFlag);
// only PATH vertices are present. Find the ID's for my neighbors
- curID.set(getVertexId());
+ curID.setAsCopy(getVertexId());
curHead = isNodeRandomHead(curID);
@@ -494,7 +494,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 33b62c3..e4e797f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -7,107 +7,75 @@
import java.util.Random;
import java.util.Set;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
public class SplitRepeatVertex extends
BasicGraphCleanVertex{
- public class CreatedVertex{
- VKmerBytesWritable createdVertexId;
- String incomingDir;
- String outgoingDir;
- VKmerBytesWritable incomingEdge;
- VKmerBytesWritable outgoingEdge;
+ public class EdgeDir{
+ public static final byte DIR_FF = 0 << 0;
+ public static final byte DIR_FR = 1 << 0;
+ public static final byte DIR_RF = 2 << 0;
+ public static final byte DIR_RR = 3 << 0;
+ }
+
+ public class DeletedEdge{
+ private byte dir;
+ private VKmerBytesWritable edge;
- public CreatedVertex(){
- createdVertexId = new VKmerBytesWritable(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge = new VKmerBytesWritable(kmerSize);
- outgoingEdge = new VKmerBytesWritable(kmerSize);
- }
-
- public void clear(){
- createdVertexId.reset(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge.reset(kmerSize);
- outgoingEdge.reset(kmerSize);
- }
-
- public VKmerBytesWritable getCreatedVertexId() {
- return createdVertexId;
+ public DeletedEdge(){
+ dir = 0;
+ edge = new VKmerBytesWritable(kmerSize);
}
- public void setCreatedVertexId(KmerBytesWritable createdVertexId) {
- this.createdVertexId = createdVertexId;
+ public byte getDir() {
+ return dir;
}
- public String getIncomingDir() {
- return incomingDir;
+ public void setDir(byte dir) {
+ this.dir = dir;
}
- public void setIncomingDir(String incomingDir) {
- this.incomingDir = incomingDir;
+ public VKmerBytesWritable getEdge() {
+ return edge;
}
- public String getOutgoingDir() {
- return outgoingDir;
- }
-
- public void setOutgoingDir(String outgoingDir) {
- this.outgoingDir = outgoingDir;
- }
-
- public VKmerBytesWritable getIncomingEdge() {
- return incomingEdge;
- }
-
- public void setIncomingEdge(KmerBytesWritable incomingEdge) {
- this.incomingEdge.set(incomingEdge);
- }
-
- public VKmerBytesWritable getOutgoingEdge() {
- return outgoingEdge;
- }
-
- public void setOutgoingEdge(KmerBytesWritable outgoingEdge) {
- this.outgoingEdge.set(outgoingEdge);
+ public void setEdge(VKmerBytesWritable edge) {
+ this.edge.setAsCopy(edge);
}
}
- private String[][] connectedTable = new String[][]{
- {"FF", "RF"},
- {"FF", "RR"},
- {"FR", "RF"},
- {"FR", "RR"}
+ private byte[][] connectedTable = new byte[][]{
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FR},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FR}
};
public static Set<String> existKmerString = new HashSet<String>();
private Set<Long> readIdSet;
private Set<Long> incomingReadIdSet = new HashSet<Long>();
private Set<Long> outgoingReadIdSet = new HashSet<Long>();
private Set<Long> selfReadIdSet = new HashSet<Long>();
- private Set<Long> incomingEdgeIntersection = new HashSet<Long>();
- private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
- private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
+ private Map<VKmerBytesWritable, Set<Long>> kmerMap = new HashMap<VKmerBytesWritable, Set<Long>>();
private VKmerListWritable incomingEdgeList = null;
private VKmerListWritable outgoingEdgeList = null;
private byte incomingEdgeDir = 0;
private byte outgoingEdgeDir = 0;
- protected KmerBytesWritable createdVertexId = null;
- private CreatedVertex createdVertex = new CreatedVertex();
- public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
+ protected VKmerBytesWritable createdVertexId = null;
/**
* initiate kmerSize, maxIteration
@@ -124,11 +92,13 @@
else
outgoingMsg.reset(kmerSize);
if(incomingEdgeList == null)
- incomingEdgeList = new VKmerListWritable(kmerSize);
+ incomingEdgeList = new VKmerListWritable();
if(outgoingEdgeList == null)
- outgoingEdgeList = new VKmerListWritable(kmerSize);
+ outgoingEdgeList = new VKmerListWritable();
if(createdVertexId == null)
- createdVertexId = new VKmerBytesWritable(kmerSize + 1);
+ createdVertexId = new VKmerBytesWritable(kmerSize);//kmerSize + 1
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
/**
@@ -150,7 +120,184 @@
return sb.toString();
}
+ /**
+ * GenerateString only for test
+ */
+ public String generateString(){
+ if(existKmerString.isEmpty()){
+ existKmerString.add("AAA");
+ return "AAA";
+ }
+ else
+ return "GGG";
+ }
+
+ public void generateKmerMap(Iterator<MessageWritable> msgIterator){
+ kmerMap.clear();
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ readIdSet = new HashSet<Long>();
+ for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
+ readIdSet.add(nodeId.getReadId());
+ }
+ kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
+ }
+ }
+
+ public void setSelfReadIdSet(){
+ selfReadIdSet.clear();
+ for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
+ selfReadIdSet.add(nodeId.getReadId());
+ }
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
+ public void createNewVertex(int i, VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ VKmerBytesWritable vertexId = new VKmerBytesWritable(kmerSize);
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
+ //add the corresponding edge to new vertex
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ vertexValue.getRFList().append(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ vertexValue.getRRList().append(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ vertexValue.getFFList().append(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ vertexValue.getFRList().append(outgoingEdge);
+ break;
+ }
+ vertexId.setAsCopy(createdVertexId);
+ vertex.setVertexId(vertexId);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(vertexId, vertex);
+ }
+
+ public void sendMsgToUpdateEdge(VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
+ outgoingMsg.setCreatedVertexId(createdVertexId);
+ outgoingMsg.setSourceVertexId(getVertexId());
+
+ outgoingMsg.setFlag(incomingEdgeDir);
+ destVertexId.setAsCopy(incomingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+
+ outgoingMsg.setFlag(outgoingEdgeDir);
+ destVertexId.setAsCopy(outgoingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ public void storeDeletedEdge(Set<DeletedEdge> deletedEdges, int i, VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
+ DeletedEdge deletedIncomingEdge = new DeletedEdge();
+ DeletedEdge deletedOutgoingEdge = new DeletedEdge();
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RF);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RR);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FF);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FR);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ }
+ deletedEdges.add(deletedIncomingEdge);
+ deletedEdges.add(deletedOutgoingEdge);
+ }
+ public void deleteEdgeFromOldVertex(DeletedEdge deleteEdge){
+ switch(deleteEdge.dir){
+ case EdgeDir.DIR_RF:
+ getVertexValue().getRFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_RR:
+ getVertexValue().getRRList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FF:
+ getVertexValue().getFFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FR:
+ getVertexValue().getFRList().remove(deleteEdge.getEdge());
+ break;
+ }
+ }
+
+ public void setEdgeListAndEdgeDir(int i){
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ incomingEdgeList.setCopy(getVertexValue().getRFList());
+ incomingEdgeDir = MessageFlag.DIR_RF;
+ break;
+ case EdgeDir.DIR_RR:
+ incomingEdgeList.setCopy(getVertexValue().getRRList());
+ incomingEdgeDir = MessageFlag.DIR_RR;
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ outgoingEdgeList.setCopy(getVertexValue().getFFList());
+ outgoingEdgeDir = MessageFlag.DIR_FF;
+ break;
+ case EdgeDir.DIR_FR:
+ outgoingEdgeList.setCopy(getVertexValue().getFRList());
+ outgoingEdgeDir = MessageFlag.DIR_FR;
+ break;
+ }
+ }
+
+ public void setNeighborEdgeIntersection(VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
+ outgoingReadIdSet.clear();
+ incomingReadIdSet.clear();
+ tmpKmer.setAsCopy(incomingEdge);
+ incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
+ tmpKmer.setAsCopy(outgoingEdge);
+ outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
+
+ //set all neighberEdge readId intersection
+ neighborEdgeIntersection.addAll(selfReadIdSet);
+ neighborEdgeIntersection.retainAll(incomingReadIdSet);
+ neighborEdgeIntersection.retainAll(outgoingReadIdSet);
+ }
+
+ public void updateEdgeListPointToNewVertex(){
+ byte meToNeighborDir = incomingMsg.getFlag();
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_FR:
+ getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RF:
+ getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RR:
+ getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -169,161 +316,103 @@
}
voteToHalt();
} else if(getSuperstep() == 3){
- kmerMap.clear();
- createdVertexSet.clear();
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- readIdSet = new HashSet<Long>();
- for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
- readIdSet.add(nodeId.getReadId());
- }
- kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
- }
+ /** generate KmerMap map kmer(key) to readIdSet(value) **/
+ generateKmerMap(msgIterator);
+
+ /** set self readId set **/
+ setSelfReadIdSet();
+
+ int count = 0;
+ //A set storing deleted edges
+ Set<DeletedEdge> deletedEdges = new HashSet<DeletedEdge>();
/** process connectedTable **/
for(int i = 0; i < 4; i++){
- switch(connectedTable[i][0]){
- case "FF":
- outgoingEdgeList.set(getVertexValue().getFFList());
- outgoingEdgeDir = MessageFlag.DIR_FF;
- break;
- case "FR":
- outgoingEdgeList.set(getVertexValue().getFRList());
- outgoingEdgeDir = MessageFlag.DIR_FR;
- break;
- }
- switch(connectedTable[i][1]){
- case "RF":
- incomingEdgeList.set(getVertexValue().getRFList());
- incomingEdgeDir = MessageFlag.DIR_RF;
- break;
- case "RR":
- incomingEdgeList.set(getVertexValue().getRRList());
- incomingEdgeDir = MessageFlag.DIR_RR;
- break;
- }
- selfReadIdSet.clear();
- for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
- selfReadIdSet.add(nodeId.getReadId());
- }
- for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
- for(KmerBytesWritable incomingEdge : incomingEdgeList){
- outgoingReadIdSet.clear();
- incomingReadIdSet.clear();
- outgoingReadIdSet.addAll(kmerMap.get(outgoingEdge));
- incomingReadIdSet.addAll(kmerMap.get(incomingEdge));
-
- //set all neighberEdge readId intersection
- neighborEdgeIntersection.addAll(selfReadIdSet);
- neighborEdgeIntersection.retainAll(outgoingReadIdSet);
- neighborEdgeIntersection.retainAll(incomingReadIdSet);
- //set outgoingEdge readId intersection
- outgoingEdgeIntersection.addAll(selfReadIdSet);
- outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
- outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
- //set incomingEdge readId intersection
- incomingEdgeIntersection.addAll(selfReadIdSet);
- incomingEdgeIntersection.retainAll(incomingReadIdSet);
- incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
+ /** set edgeList and edgeDir based on connectedTable **/
+ setEdgeListAndEdgeDir(i);
+
+ VKmerBytesWritable incomingEdge = new VKmerBytesWritable(kmerSize);
+ VKmerBytesWritable outgoingEdge = new VKmerBytesWritable(kmerSize);
+ for(int x = 0; x < incomingEdgeList.getCountOfPosition(); x++){
+ for(int y = 0; y < outgoingEdgeList.getCountOfPosition(); y++){
+ incomingEdge.setAsCopy(incomingEdgeList.getPosition(x));
+ outgoingEdge.setAsCopy(outgoingEdgeList.getPosition(y));
+ /** set neighborEdge readId intersection **/
+ setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
if(!neighborEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
+ if(count == 0)
+ createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+ else
+ createdVertexId.setByRead("GGG".getBytes(), 0);
+ count++;
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
- }
-
- if(!incomingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertexSet.add(createdVertex);
+ /** create new/created vertex **/
+ createNewVertex(i, incomingEdge, outgoingEdge);
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- }
-
- if(!outgoingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
+ /** send msg to neighbors to update their edges to new vertex **/
+ sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
+ /** store deleted edge **/
+ storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
}
}
}
+
+// for(KmerBytesWritable incomingEdge : incomingEdgeList){
+// for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
+// /** set neighborEdge readId intersection **/
+// setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
+//
+// if(!neighborEdgeIntersection.isEmpty()){
+// if(count == 0)
+// createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+// else
+// createdVertexId.setByRead("GGG".getBytes(), 0);
+// count++;
+//
+// /** create new/created vertex **/
+// createNewVertex(i, incomingEdge, outgoingEdge);
+//
+// /** send msg to neighbors to update their edges to new vertex **/
+// sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
+//
+// /** store deleted edge **/
+// storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
+// }
+// }
+// }
}
+ /** delete extra edges from old vertex **/
+ for(DeletedEdge deletedEdge : deletedEdges){
+ deleteEdgeFromOldVertex(deletedEdge);
+ }
+
+ /** Old vertex delete or voteToHalt **/
+ if(getVertexValue().getDegree() == 0)//if no any edge, delete
+ deleteVertex(getVertexId());
+ else
+ voteToHalt();
} else if(getSuperstep() == 4){
while(msgIterator.hasNext()){
incomingMsg = msgIterator.next();
/** update edgelist to new/created vertex **/
- byte meToNeighborDir = incomingMsg.getFlag();
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- switch(neighborToMeDir){
- case MessageFlag.DIR_FF:
- getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_FR:
- getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RF:
- getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RR:
- getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
- break;
- }
- /** add new/created vertex **/
- for(CreatedVertex v : createdVertexSet){
- Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
- VertexValueWritable vertexValue = new VertexValueWritable();
- switch(v.incomingDir){
- case "RF":
- vertexValue.getRFList().append(v.incomingEdge);
- break;
- case "RR":
- vertexValue.getRRList().append(v.incomingEdge);
- break;
- }
- switch(v.outgoingDir){
- case "FF":
- vertexValue.getFFList().append(v.outgoingEdge);
- break;
- case "FR":
- vertexValue.getFRList().append(v.outgoingEdge);
- break;
- }
- vertex.setVertexId(v.getCreatedVertexId());
- vertex.setVertexValue(vertexValue);
- }
- createdVertexSet.clear();
+ updateEdgeListPointToNewVertex();
}
+ voteToHalt();
}
}
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(SplitRepeatVertex.class.getSimpleName());
+ job.setVertexClass(SplitRepeatVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 519ffb9..4a540b8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -3,12 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -47,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class TipAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "TipAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -58,37 +57,39 @@
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
}
-
+
+ /**
+ * create a new vertex point to split node
+ */
+
@SuppressWarnings("unchecked")
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4){
- getVertexValue().getFFList().append(2, (byte)1);
+ if(getVertexId().toString().equals("CTA")){
+ VKmerBytesWritable vertexId = new VKmerBytesWritable(kmerSize);
+ vertexId.setByRead("AGC".getBytes(), 0);
+ getVertexValue().getRFList().append(vertexId);
//add tip vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(2, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'G', 'A', 'A'};
- KmerBytesWritable kmer = new KmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)4));
- vertexValue.setRRList(plist);
+ VKmerListWritable kmerList = new VKmerListWritable();
+ kmerList.append(getVertexId());
+ vertexValue.setRFList(kmerList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
@@ -106,7 +107,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index b4f2407..527fb66 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -3,15 +3,14 @@
import java.util.Iterator;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -57,7 +56,14 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if(length == -1)
length = getContext().getConfiguration().getInt(LENGTH, kmerSize); //kmerSize + 5
- outgoingMsg.reset();
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
@Override
@@ -66,35 +72,26 @@
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getFFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- else if(getVertexValue().getFRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+ sendSettledMsgToPreviousNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getRFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- else if(getVertexValue().getRRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getPreDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+ sendSettledMsgToNextNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isSingleVertex(getVertexValue())){
- if(getVertexValue().getLengthOfKmer() > length)
+ if(getVertexValue().getLengthOfKmer() <= length)
deleteVertex(getVertexId());
}
}
else if(getSuperstep() == 2){
- responseToDeadVertex(msgIterator);
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
@@ -108,7 +105,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 73ab533..4b32a51 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -3,7 +3,7 @@
public class CheckMessage {
public static final byte SOURCE = 1 << 0;
- public static final byte CHAIN = 1 << 1;
+ public static final byte ACUTUALKMER = 1 << 1;
public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
public static final byte NODEIDLIST = 1 << 4;
@@ -18,8 +18,8 @@
case SOURCE:
r = "SOURCE";
break;
- case CHAIN:
- r = "CHAIN";
+ case ACUTUALKMER:
+ r = "ACUTUALKMER";
break;
case NEIGHBER:
r = "NEIGHBER";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 2ed36e9..51a73f1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -11,5 +11,5 @@
public static final byte DIR_MASK = 0b111 << 0;
public static final byte DIR_CLEAR = 0b1111000 << 0;
- public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
+// public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index b7f6a4f..4582557 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,9 +1,8 @@
package edu.uci.ics.genomix.pregelix.util;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class VertexUtil {
/**
@@ -68,7 +67,7 @@
/**
* check if mergeChain is cycle
*/
- public static boolean isCycle(KmerBytesWritable kmer, KmerBytesWritable mergeChain, int kmerSize) {
+ public static boolean isCycle(VKmerBytesWritable kmer, VKmerBytesWritable mergeChain, int kmerSize) {
String chain = mergeChain.toString().substring(1);
return chain.contains(kmer.toString());
@@ -117,7 +116,7 @@
/**
* get nodeId from Ad
*/
- public static KmerBytesWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
+ public static VKmerBytesWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
if(adj.getForwardList().getCountOfPosition() > 0)
return adj.getForwardList().getPosition(0);
else if(adj.getReverseList().getCountOfPosition() > 0)
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 630b5aa..ff5f404 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -128,90 +128,90 @@
private static void genSplitRepeatGraph() throws IOException {
generateSplitRepeatGraphJob("SplitRepeatGraph", outputBase + "SplitRepeatGraph.xml");
}
-// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(TipAddVertex.class);
-// job.setVertexInputFormatClass(GraphCleanOutputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genTipAddGraph() throws IOException {
-// generateTipAddGraphJob("TipAddGraph", outputBase
-// + "TipAddGraph.xml");
-// }
-//
-// private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(TipRemoveVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genTipRemoveGraph() throws IOException {
-// generateTipRemoveGraphJob("TipRemoveGraph", outputBase
-// + "TipRemoveGraph.xml");
-// }
-//
-// private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BridgeAddVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBridgeAddGraph() throws IOException {
-// generateBridgeAddGraphJob("BridgeAddGraph", outputBase
-// + "BridgeAddGraph.xml");
-// }
-//
-// private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BridgeRemoveVertex.class);
-// job.setVertexInputFormatClass(GraphCleanInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBridgeRemoveGraph() throws IOException {
-// generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
-// + "BridgeRemoveGraph.xml");
-// }
-//
-// private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BubbleAddVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBubbleAddGraph() throws IOException {
-// generateBubbleAddGraphJob("BubbleAddGraph", outputBase
-// + "BubbleAddGraph.xml");
-// }
+ private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(TipAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genTipAddGraph() throws IOException {
+ generateTipAddGraphJob("TipAddGraph", outputBase
+ + "TipAddGraph.xml");
+ }
+
+ private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(TipRemoveVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genTipRemoveGraph() throws IOException {
+ generateTipRemoveGraphJob("TipRemoveGraph", outputBase
+ + "TipRemoveGraph.xml");
+ }
+
+ private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeAddGraph() throws IOException {
+ generateBridgeAddGraphJob("BridgeAddGraph", outputBase
+ + "BridgeAddGraph.xml");
+ }
+
+ private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeRemoveVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeRemoveGraph() throws IOException {
+ generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
+ + "BridgeRemoveGraph.xml");
+ }
+
+ private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BubbleAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBubbleAddGraph() throws IOException {
+ generateBubbleAddGraphJob("BubbleAddGraph", outputBase
+ + "BubbleAddGraph.xml");
+ }
//
// private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -243,6 +243,11 @@
// genP4ForMergeGraph();
// genMapReduceGraph();
genSplitRepeatGraph();
+ genTipAddGraph();
+ genBridgeAddGraph();
+ genTipRemoveGraph();
+ genBridgeRemoveGraph();
+ genBubbleAddGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
index 221a76a..c4f0963 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
@@ -43,20 +43,9 @@
public class BridgeAddSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BridgeAddSmallTestSuite.class.getName());
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/AddBridge";
public static final String[] TestDir = { PreFix + File.separator
- + "tworeads"};/*, PreFix + File.separator
- /*+ "CyclePath"};, PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "TreePath"};*/
- /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
- PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
- PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
- PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
- PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
- PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
- PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
+ + "SimpleTest"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bridgeadd";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
index 397c514..171a6ca 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
@@ -43,9 +43,9 @@
public class BridgeRemoveSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BridgeRemoveSmallTestSuite.class.getName());
- public static final String PreFix = "data/actual"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/bridgeadd/BridgeAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "pathmerge/P4ForMergeGraph/bin/tworeads"};
+ + "SimpleTest"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bridgeremove";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
new file mode 100644
index 0000000..a9c2774
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class BubbleAddSmallTestSuite extends TestSuite {
+ private static final Logger LOGGER = Logger.getLogger(BubbleAddSmallTestSuite.class.getName());
+
+ public static final String PreFix = "data/PathMergeTestSet";
+ public static final String[] TestDir = { PreFix + File.separator
+ + "5"};
+ private static final String ACTUAL_RESULT_DIR = "data/actual/bubbleadd";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only_bubbleadd.txt";
+
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles()
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ BubbleAddSmallTestSuite testSuite = new BubbleAddSmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
+
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "bin" + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "txt" + File.separator + testDir.getName();
+ testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(), dfs,
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
+
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
+
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
index 21ffe34..1948acd 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
@@ -45,7 +45,7 @@
//P4ForMergeGraph/bin/read
public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "SimpleTest"};
+ + "AdjSplitRepeat"};
private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
@@ -59,7 +59,7 @@
private MiniDFSCluster dfsCluster;
private JobConf conf = new JobConf();
- private int numberOfNC = 2;
+ private int numberOfNC = 1;
public void setUp() throws Exception {
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
index f4456ce..57f4ea5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
@@ -43,20 +43,9 @@
public class TipAddSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(TipAddSmallTestSuite.class.getName());
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/PathMergeTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "read"};/*, PreFix + File.separator
- /*+ "CyclePath"};, PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "TreePath"};*/
- /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
- PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
- PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
- PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
- PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
- PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
- PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
+ + "5"};
private static final String ACTUAL_RESULT_DIR = "data/actual/tipadd";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
index def24e4..e8ca43f 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
@@ -43,10 +43,9 @@
public class TipRemoveSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(TipRemoveSmallTestSuite.class.getName());
//P4ForMergeGraph/bin/read
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/tipadd/TipAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "graphs/tipremove/fr_with_tip"};
- //+ "bridgeadd/BridgeAddGraph/bin/tworeads"};
+ + "5"};
private static final String ACTUAL_RESULT_DIR = "data/actual/tipremove";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";