Merge commit '07acebba2c4470834919a82092faec0c982411e5' into nanzhang/hyracks_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
index f7caebb..5b10fdc 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -8,14 +8,14 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.genomix.data.Marshal;
-/**
- * A list of fixed-length kmers. The length of this list is stored internally.
- */
-public class KmerListWritable implements Writable, Iterable<KmerBytesWritable>, Serializable {
+
+public class KmerListWritable extends BinaryComparable
+ implements Writable, Iterable<KmerBytesWritable>, Serializable{
private static final long serialVersionUID = 1L;
protected static final byte[] EMPTY_BYTES = { 0, 0, 0, 0 };
protected static final int HEADER_SIZE = 4;
@@ -236,7 +236,6 @@
public int getLength() {
return valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
}
-
@Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
@@ -257,4 +256,10 @@
public int hashCode() {
return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
}
+
+ @Override
+ public byte[] getBytes() {
+
+ return null;
+ }
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 81565a3..efeff71 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -26,6 +26,7 @@
import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
public class KmerBytesWritableTest {
static byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
@@ -124,4 +125,312 @@
Assert.assertEquals(kmer.toString(), kmerAppend.toString());
}
}
+
+
+ @Test
+ public void TestMergeFFKmer() {
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ String text = "AGCTGACCGT";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(8);
+ kmer1.setByRead(array, 0);
+ String text1 = "AGCTGACC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(8);
+ kmer2.setByRead(array, 1);
+ String text2 = "GCTGACCG";
+ Assert.assertEquals(text2, kmer2.toString());
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ int kmerSize = 8;
+ merge.mergeWithFFKmer(kmerSize, kmer2);
+ Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
+
+ for (int i = 1; i < 8; i++) {
+ merge.set(kmer1);
+ merge.mergeWithFFKmer(i, kmer2);
+ Assert.assertEquals(text1 + text2.substring(i - 1), merge.toString());
+ }
+
+ for (int ik = 1; ik <= 10; ik++) {
+ for (int jk = 1; jk <= 10; jk++) {
+ kmer1 = new KmerBytesWritable(ik);
+ kmer2 = new KmerBytesWritable(jk);
+ kmer1.setByRead(array, 0);
+ kmer2.setByRead(array, 0);
+ text1 = text.substring(0, ik);
+ text2 = text.substring(0, jk);
+ Assert.assertEquals(text1, kmer1.toString());
+ Assert.assertEquals(text2, kmer2.toString());
+ for (int x = 1; x < jk; x++) {
+ merge.set(kmer1);
+ merge.mergeWithFFKmer(x, kmer2);
+ Assert.assertEquals(text1 + text2.substring(x - 1), merge.toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void TestMergeFRKmer() {
+ int kmerSize = 3;
+ String result = "AAGCTAACAACC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AAGCTAA";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 0);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "GGTTGTT";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, result.length() - text2.length());
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithFRKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAAACAACC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAACAACC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAACAACC", merge.toString());
+ }
+
+
+ @Test
+ public void TestMergeRFKmer() {
+ int kmerSize = 3;
+ String result = "GGCACAACAACCC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AACAACCC";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 5);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "TTGTGCC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, 0);
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithRFKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAAACAACCC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAACAACCC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAACAACCC", merge.toString());
+
+ String test1;
+ String test2;
+ test1 = "CTA";
+ test2 = "AGA";
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("TCTA", k1.toString());
+
+ test1 = "CTA";
+ test2 = "ATA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("CTAT", k1.toString());
+
+ test1 = "ATA";
+ test2 = "CTA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("ATAG", k1.toString());
+
+ test1 = "TCTAT";
+ test2 = "GAAC";
+ k1 = new KmerBytesWritable(5);
+ k2 = new KmerBytesWritable(4);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("GTTCTAT", k1.toString());
+ }
+
+
+
+ @Test
+ public void TestMergeRRKmer() {
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ String text = "AGCTGACCGT";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(8);
+ kmer1.setByRead(array, 0);
+ String text1 = "AGCTGACC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(8);
+ kmer2.setByRead(array, 1);
+ String text2 = "GCTGACCG";
+ Assert.assertEquals(text2, kmer2.toString());
+ KmerBytesWritable merge = new KmerBytesWritable(kmer2);
+ int kmerSize = 8;
+ merge.mergeWithRRKmer(kmerSize, kmer1);
+ Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
+
+ for (int i = 1; i < 8; i++) {
+ merge.set(kmer2);
+ merge.mergeWithRRKmer(i, kmer1);
+ Assert.assertEquals(text1.substring(0, text1.length() - i + 1) + text2, merge.toString());
+ }
+
+ for (int ik = 1; ik <= 10; ik++) {
+ for (int jk = 1; jk <= 10; jk++) {
+ kmer1 = new KmerBytesWritable(ik);
+ kmer2 = new KmerBytesWritable(jk);
+ kmer1.setByRead(array, 0);
+ kmer2.setByRead(array, 0);
+ text1 = text.substring(0, ik);
+ text2 = text.substring(0, jk);
+ Assert.assertEquals(text1, kmer1.toString());
+ Assert.assertEquals(text2, kmer2.toString());
+ for (int x = 1; x < ik; x++) {
+ merge.set(kmer2);
+ merge.mergeWithRRKmer(x, kmer1);
+ Assert.assertEquals(text1.substring(0, text1.length() - x + 1) + text2, merge.toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void TestFinalMerge() {
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ int kmerSize = 3;
+
+ String F1 = "AATAG";
+ String F2 = "TAGAA";
+ String R1 = "CTATT";
+ String R2 = "TTCTA";
+
+ //FF test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = F2;
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //FR test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(R2);
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RF test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(F2);
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RR test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = R2;
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ String[][] connectedTable = new String[][]{
+ {"FF", "RF"},
+ {"FF", "RR"},
+ {"FR", "RF"},
+ {"FR", "RR"}
+ };
+ System.out.println(connectedTable[0][1]);
+
+ Set<Long> s1 = new HashSet<Long>();
+ Set<Long> s2 = new HashSet<Long>();
+ s1.add((long) 1);
+ s1.add((long) 2);
+ s2.add((long) 2);
+ s2.add((long) 3);
+ Set<Long> intersection = new HashSet<Long>();
+ intersection.addAll(s1);
+ intersection.retainAll(s2);
+ System.out.println(intersection.toString());
+ Set<Long> difference = new HashSet<Long>();
+ difference.addAll(s1);
+ difference.removeAll(s2);
+ System.out.println(difference.toString());
+
+ Map<KmerBytesWritable, Set<Long>> map = new HashMap<KmerBytesWritable, Set<Long>>();
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ Set<Long> set1 = new HashSet<Long>();
+ k1.setByRead(("CTA").getBytes(), 0);
+ set1.add((long)1);
+ map.put(k1, set1);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k2.setByRead(("GTA").getBytes(), 0);
+ Set<Long> set2 = new HashSet<Long>();
+ set2.add((long) 2);
+ map.put(k2, set2);
+ KmerBytesWritable k3 = new KmerBytesWritable(3);
+ k3.setByRead(("ATG").getBytes(), 0);
+ Set<Long> set3 = new HashSet<Long>();
+ set3.add((long) 2);
+ map.put(k3, set3);
+ KmerBytesWritable k4 = new KmerBytesWritable(3);
+ k4.setByRead(("AAT").getBytes(), 0);
+ Set<Long> set4 = new HashSet<Long>();
+ set4.add((long) 1);
+ map.put(k4, set4);
+ KmerListWritable kmerList = new KmerListWritable(3);
+ kmerList.append(k1);
+ kmerList.append(k2);
+ System.out.println("CTA = " + map.get(k1).toString());
+ System.out.println("GTA = " + map.get(k2).toString());
+ System.out.println("ATG = " + map.get(k3).toString());
+ System.out.println("AAT = " + map.get(k4).toString());
+ System.out.println(k1.compareTo(k2));
+ System.out.println(k2.compareTo(k1));
+
+ System.out.println("CTA = " + kmerList.getPosition(0).toString());
+ System.out.println("GTA = " + kmerList.getPosition(1).toString());
+ System.out.println("CTA = " + map.get(kmerList.getPosition(0)).toString());
+ System.out.println("GTA = " + map.get(kmerList.getPosition(1)).toString());
+ }
}
diff --git a/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
new file mode 100644
index 0000000..f2e3942
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
@@ -0,0 +1,3 @@
+1 AATAG
+2 GCATA
+3 ATAGC
diff --git a/genomix/genomix-hadoop/data/webmap/MergeBubble.txt b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
new file mode 100644
index 0000000..087f43e
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
@@ -0,0 +1,2 @@
+1 AATAGAA
+2 AATACAA
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index f39cdcb..a0eb7c8 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -37,7 +37,7 @@
while (values.hasNext()) {
tmpNode.set(values.next());
outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
- outputNode.getFFList().appendList(tmpNode.getFFList());
+ outputNode.getFFList().appendList(tmpNode.getFFList()); //appendList need to check if insert node exists
outputNode.getFRList().appendList(tmpNode.getFRList());
outputNode.getRFList().appendList(tmpNode.getRFList());
outputNode.getRRList().appendList(tmpNode.getRRList());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 127ab3e..cc7e0ac 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,7 +22,7 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
+ private static final String DATA_PATH = "data/webmap/RemoveBridge.txt";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
diff --git a/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000 b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
new file mode 100755
index 0000000..bc255a4
--- /dev/null
+++ b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
new file mode 100755
index 0000000..a983577
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
new file mode 100755
index 0000000..a187c64
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
index e8a72ce..859ddba 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
@@ -66,6 +66,7 @@
* set the vertex value
*/
vertexValue.set(getRecordReader().getCurrentValue());
+ vertexValue.setKmerlength(getRecordReader().getCurrentValue().getKmerlength());
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index 09e98fa..d5fe843 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -6,8 +6,8 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
+
+import edu.uci.ics.genomix.type.KmerListWritable;
public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
private VKmerListWritable forwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index 9fd15dd..95fa865 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -41,8 +41,9 @@
this.sourceVertexId.set(msg.getSourceVertexId());
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.setAsCopy(msg.getChainVertexId());
+
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.chainVertexId.set(msg.getChainVertexId());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -62,8 +63,8 @@
this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.setAsCopy(chainVertexId);
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.chainVertexId.set(chainVertexId);
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -100,8 +101,8 @@
public void setChainVertexId(KmerBytesWritable chainVertexId) {
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.setAsCopy(chainVertexId);
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.chainVertexId.set(chainVertexId);
}
}
@@ -144,7 +145,7 @@
out.writeByte(checkMessage);
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
chainVertexId.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
@@ -159,7 +160,7 @@
checkMessage = in.readByte();
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
chainVertexId.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index e3cd345..92f8464 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -26,13 +26,15 @@
private boolean isFlip;
private int kmerlength = 0;
private boolean updateMsg = false;
-
+ private KmerBytesWritable startVertexId;
+
private byte checkMessage;
public MessageWritable() {
sourceVertexId = new KmerBytesWritable();
kmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable();
+ startVertexId = new KmerBytesWritable();
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -40,9 +42,11 @@
public MessageWritable(int kmerSize) {
kmerlength = kmerSize;
- sourceVertexId = new KmerBytesWritable();
- kmer = new VKmerBytesWritable();
+
+ sourceVertexId = new KmerBytesWritable(kmerSize);
+ kmer = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable(kmerSize);
+ startVertexId = new KmerBytesWritable(kmerSize);
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -56,13 +60,17 @@
this.sourceVertexId.setAsCopy(msg.getSourceVertexId());
}
if (kmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(msg.getActualKmer());
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(msg.getActualKmer());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(msg.getNeighberNode());
}
+ if (startVertexId != null) {
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(msg.getStartVertexId());
+ }
checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
updateMsg = msg.isUpdateMsg();
@@ -76,8 +84,8 @@
this.sourceVertexId.setAsCopy(sourceVertexId);
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(chainVertexId.toString())); // TODO Vkmer
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(chainVertexId);
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -95,6 +103,7 @@
kmerlength = kmerSize;
// kmer.reset();
neighberNode.reset(kmerSize);
+ startVertexId.reset(kmerSize);
flag = Message.NON;
isFlip = false;
}
@@ -116,8 +125,8 @@
public void setActualKmer(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(actualKmer);
}
}
@@ -127,8 +136,8 @@
public void setCreatedVertexId(KmerBytesWritable actualKmer) {
if (actualKmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(actualKmer);
}
}
@@ -143,6 +152,17 @@
}
}
+ public KmerBytesWritable getStartVertexId() {
+ return startVertexId;
+ }
+
+ public void setStartVertexId(KmerBytesWritable startVertexId) {
+ if(startVertexId != null){
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(startVertexId);
+ }
+ }
+
public int getLengthOfChain() {
return kmer.getKmerLetterLength();
}
@@ -189,12 +209,14 @@
out.writeByte(checkMessage);
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
nodeIdList.write(out);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
out.writeBoolean(updateMsg);
@@ -207,12 +229,14 @@
checkMessage = in.readByte();
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
nodeIdList.readFields(in);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
updateMsg = in.readBoolean();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 5e7a856..27fa846 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -12,18 +12,17 @@
public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
- public static class State extends VertexStateFlag{
- public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b101 << 0;
- public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b111 << 0;
-
+ public static class State extends VertexStateFlag{
public static final byte NO_MERGE = 0b00 << 3;
public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
- public static final byte KILL = 0b11 << 3;
- public static final byte KILL_MASK = 0b11 << 3;
+ public static final byte KILL = 0b1 << 3;
+ public static final byte KILL_MASK = 0b1 << 3;
+
+ public static final byte DIR_FROM_DEADVERTEX = 0b10 << 3;
}
public static class VertexStateFlag extends FakeFlag {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 89b66e6..3c7ae8a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -3,12 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -48,7 +47,7 @@
* Naive Algorithm for path merge graph
*/
public class BridgeAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
@@ -60,7 +59,7 @@
public void initVertex() {
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if(length == -1)
+ if (length == -1)
length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
}
@@ -69,40 +68,42 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(3, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
//add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(3, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C'};
- VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(2, (byte)2));
- vertexValue.setFFList(plist2);
+ KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("ACG".getBytes(), 0);
+ KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
+ }
+ else if(getVertexId().toString().equals("ACG")){
+ KmerBytesWritable brdgeVertexId = new KmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getRFList().append(brdgeVertexId);
}
- if(getVertexId().getReadID() == 2 && getVertexId().getPosInRead() == 2)
- getVertexValue().getRRList().append(3, (byte)1);
}
voteToHalt();
}
@@ -116,7 +117,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 7f85fc7..caf7a36 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -11,8 +11,8 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -59,76 +59,17 @@
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if(length == -1)
- length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
- outgoingMsg.reset();
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
receivedMsgList.clear();
}
-
- /**
- * broadcast kill self to all neighbers
- */
- public void broadcaseKillself(){
- outgoingMsg.setSourceVertexId(getVertexId());
- if(receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } // RR
- else if(receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else
- voteToHalt();
- }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
@@ -150,13 +91,17 @@
i++;
}
if(receivedMsgList.size() == 2){
- if(getVertexValue().getLengthOfKmer() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length
+ && getVertexValue().getDegree() == 2){
broadcaseKillself();
}
}
}
else if(getSuperstep() == 3){
- responseToDeadVertex(msgIterator);
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index c0ba1a9..f5aa3a1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -4,12 +4,10 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -48,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class BubbleAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BubbleAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -65,40 +63,42 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(2, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
- //add tip vertex
+ //add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(2, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
- VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(1, (byte)4));
- vertexValue.setFFList(plist2);
+ KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("AGA".getBytes(), 0);
+ KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
+ }
+ else if(getVertexId().toString().equals("AGA")){
+ KmerBytesWritable brdgeVertexId = new KmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getRFList().append(brdgeVertexId);
}
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4)
- getVertexValue().getRRList().append(2, (byte)1);
}
voteToHalt();
}
@@ -112,7 +112,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index b782294..d630b5f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -7,6 +7,7 @@
import org.apache.hadoop.io.NullWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -51,7 +52,7 @@
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
public static final String ITERATIONS = "BubbleMergeVertex.iteration";
public static int kmerSize = -1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index fc0cd4b..4ae7231 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -5,12 +5,10 @@
import org.apache.hadoop.io.NullWritable;
-
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.GeneCode;
@@ -29,9 +27,10 @@
protected MessageWritable incomingMsg = null;
protected MessageWritable outgoingMsg = null;
- protected VKmerBytesWritable destVertexId = new VKmerBytesWritable();
- protected Iterator<VKmerBytesWritable> posIterator;
- protected VKmerBytesWritable tmpKmer = new VKmerBytesWritable();
+
+ protected KmerBytesWritable destVertexId = null;
+ protected Iterator<KmerBytesWritable> kmerIterator;
+ protected KmerBytesWritable tmpKmer = new KmerBytesWritable(kmerSize);
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -70,11 +69,11 @@
*/
public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
- return posIterator.next();
+ kmerIterator = value.getFFList().iterator();
+ return kmerIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
+ kmerIterator = value.getFRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
@@ -82,11 +81,11 @@
public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
- return posIterator.next();
+ kmerIterator = value.getRFList().iterator();
+ return kmerIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
+ kmerIterator = value.getRRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
@@ -97,15 +96,15 @@
*/
public VKmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
+ kmerIterator = value.getFFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
+ kmerIterator = value.getFRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
@@ -114,48 +113,48 @@
public VKmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
+ kmerIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
+ kmerIterator = value.getRRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
}
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
+
/**
* head send message to all previous nodes
*/
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -169,21 +168,49 @@
}
/**
+ * tip send message with sourceId and dir to previous node
+ * tip only has one incoming
+ */
+ public void sendSettledMsgToPreviousNode(){
+ if(getVertexValue().getFFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
+ * tip send message with sourceId and dir to next node
+ * tip only has one outgoing
+ */
+ public void sendSettledMsgToNextNode(){
+ if(getVertexValue().getRFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getPreDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
* head send message to all previous nodes
*/
public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRF);
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -192,18 +219,18 @@
* head send message to all next nodes
*/
public void sendSettledMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFF);
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFR);
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -748,44 +775,44 @@
switch(incomingMsg.getFlag() & MessageFlag.DIR_MASK){
case MessageFlag.DIR_FF:
//remove incomingMsg.getSourceId from RR positionList
- posIterator = getVertexValue().getRRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_FR:
//remove incomingMsg.getSourceId from FR positionList
- posIterator = getVertexValue().getFRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RF:
//remove incomingMsg.getSourceId from RF positionList
- posIterator = getVertexValue().getRFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RR:
//remove incomingMsg.getSourceId from FF positionList
- posIterator = getVertexValue().getFFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index c1b1dda..51100e5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -55,6 +55,8 @@
String random = generaterRandomString(kmerSize + 1);
fakeVertex.setByRead(random.getBytes(), 0);
}
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index c9942a2..d3f21f3 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -80,6 +80,8 @@
fakeVertex.setByRead(random.getBytes(), 0);
}
isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
/**
@@ -266,7 +268,7 @@
mapKeyByActualKmer(msgIterator);
/** Reducer **/
reduceKeyByActualKmer();
- voteToHalt();
+ voteToHalt();
}
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
if(!isFakeVertex){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index ecfafa7..02151fa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -80,6 +80,8 @@
outgoingMsg = new MessageWritable(kmerSize);
else
outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
deleted file mode 100644
index 3f91ac1..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ /dev/null
@@ -1,501 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator.pathmerge;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.hadoop.io.NullWritable;
-
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-/**
- * Naive Algorithm for path merge graph
- */
-public class P5ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "P5ForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "P5ForPathMergeVertex.iteration";
- public static final String RANDSEED = "P5ForPathMergeVertex.randSeed";
- public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private static long randSeed = -1;
- private float probBeingRandomHead = -1;
- private Random randGenerator;
-
- private PositionWritable curID = new PositionWritable();
- private PositionWritable nextID = new PositionWritable();
- private PositionWritable prevID = new PositionWritable();
- private boolean hasNext;
- private boolean hasPrev;
- private boolean curHead;
- private boolean nextHead;
- private boolean prevHead;
- private byte headFlag;
- private byte tailFlag;
- private byte outFlag;
-
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
-
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex() {
- if (kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- if (randSeed < 0)
- randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
- randGenerator = new Random(randSeed);
- if (probBeingRandomHead < 0)
- probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
- hasNext = false;
- hasPrev = false;
- curHead = false;
- nextHead = false;
- prevHead = false;
- outgoingMsg.reset();
- }
-
- protected boolean isNodeRandomHead(PositionWritable nodeID) {
- // "deterministically random", based on node id
- randGenerator.setSeed(randSeed ^ nodeID.hashCode());
- return randGenerator.nextFloat() < probBeingRandomHead;
- }
-
- /**
- * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
- */
- protected boolean setNextInfo(VertexValueWritable value) {
- if (value.getFFList().getCountOfPosition() > 0) {
- nextID.set(value.getFFList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
- return true;
- }
- if (value.getFRList().getCountOfPosition() > 0) {
- nextID.setAsCopy(value.getFRList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
- return true;
- }
- return false;
- }
-
- /**
- * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
- */
- protected boolean setPrevInfo(VertexValueWritable value) {
- if (value.getRRList().getCountOfPosition() > 0) {
- prevID.setAsCopy(value.getRRList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
- return true;
- }
- if (value.getRFList().getCountOfPosition() > 0) {
- prevID.setAsCopy(value.getRFList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
- return true;
- }
- return false;
- }
-
- /**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getFlag() == Message.START) {
- getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
- } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.IS_HEAD) {
- getVertexValue().setState(MessageFlag.IS_HEAD);
- getVertexValue().setKmer(getVertexValue().getKmer());
- //voteToHalt();
- } //else
- //voteToHalt();
- }
-
- /**
- * check if A need to be flipped with successor
- */
- public boolean ifFilpWithSuccessor(){
- if(getVertexValue().getFRList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * check if A need to be filpped with predecessor
- */
- public boolean ifFlipWithPredecessor(){
- if(getVertexValue().getRFList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * set adjMessage to successor(from predecessor)
- */
- public void setSuccessorAdjMsg(){
- if(getVertexValue().getFFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_FF;
- else
- outFlag |= MessageFlag.DIR_FR;
- }
-
- /**
- * set adjMessage to predecessor(from successor)
- */
- public void setPredecessorAdjMsg(){
- if(getVertexValue().getRFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_RF;
- else
- outFlag |= MessageFlag.DIR_RF;
- }
-
- /**
- * send update message to neighber
- * @throws IOException
- */
- public void broadcastUpdateMsg(){
- /* switch(getVertexValue().getState() & 0b0001){
- case MessageFlag.SHOULD_MERGEWITHPREV:
- setSuccessorAdjMsg();
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- break;
- case MessageFlag.SHOULD_MERGEWITHNEXT:
- setPredecessorAdjMsg();
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
- break;
- }*/
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromPredecessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHNEXT;
- getVertexValue().setState(state);
- if(getVertexValue().getFFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromSuccessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHPREV;
- getVertexValue().setState(state);
- if(getVertexValue().getRFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * Returns the edge dir for B->A when the A->B edge is type @dir
- */
- public byte mirrorDirection(byte dir) {
- switch (dir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RF;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_FF;
- default:
- throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
- }
- }
-
- /**
- * check if need filp
- */
- public byte flipDirection(byte neighborDir, boolean flip){
- if(flip){
- switch (neighborDir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FF;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_RF;
- default:
- throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
- }
- } else
- return neighborDir;
- }
-
- /**
- * updateAdjList
- */
- public void processUpdate(){
- /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));*/
- }
-
- /**
- * merge and updateAdjList
- */
- public void processMerge(){
- /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getKmer());*/
- }
-
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1)
- startSendMsg();
- else if (getSuperstep() == 2)
- initState(msgIterator);
- else if (getSuperstep() % 4 == 3){
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
- tailFlag = (byte) (State.IS_HEAD & getVertexValue().getState()); //is_tail
- outFlag = (byte) (headFlag | tailFlag);
-
- // only PATH vertices are present. Find the ID's for my neighbors
- curID.set(getVertexId());
-
- curHead = isNodeRandomHead(curID);
-
- // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
- // We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
- hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
- getVertexValue().setState(outFlag);
- voteToHalt();
- }
- if (hasNext || hasPrev) {
- if (curHead) {
- if (hasNext && !nextHead) {
- // compress this head to the forward tail
- sendUpMsgFromPredecessor();
- } else if (hasPrev && !prevHead) {
- // compress this head to the reverse tail
- sendUpMsgFromSuccessor();
- }
- } else {
- // I'm a tail
- if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
- // tails on both sides, and I'm the "local minimum"
- // compress me towards the tail in forward dir
- sendUpMsgFromPredecessor();
- }
- } else if (!hasPrev) {
- // no previous node
- if (!nextHead && curID.compareTo(nextID) < 0) {
- // merge towards tail in forward dir
- sendUpMsgFromPredecessor();
- }
- } else if (!hasNext) {
- // no next node
- if (!prevHead && curID.compareTo(prevID) < 0) {
- // merge towards tail in reverse dir
- sendUpMsgFromSuccessor();
- }
- }
- }
- }
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(P5ForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(P5ForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 33b62c3..fa50e66 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -7,6 +7,9 @@
import java.util.Random;
import java.util.Set;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
@@ -16,88 +19,57 @@
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
public class SplitRepeatVertex extends
BasicGraphCleanVertex{
- public class CreatedVertex{
- VKmerBytesWritable createdVertexId;
- String incomingDir;
- String outgoingDir;
- VKmerBytesWritable incomingEdge;
- VKmerBytesWritable outgoingEdge;
+
+ public class EdgeDir{
+ public static final byte DIR_FF = 0 << 0;
+ public static final byte DIR_FR = 1 << 0;
+ public static final byte DIR_RF = 2 << 0;
+ public static final byte DIR_RR = 3 << 0;
+ }
+
+ public class DeletedEdge{
+ private byte dir;
+ private KmerBytesWritable edge;
- public CreatedVertex(){
- createdVertexId = new VKmerBytesWritable(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge = new VKmerBytesWritable(kmerSize);
- outgoingEdge = new VKmerBytesWritable(kmerSize);
- }
-
- public void clear(){
- createdVertexId.reset(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge.reset(kmerSize);
- outgoingEdge.reset(kmerSize);
- }
-
- public VKmerBytesWritable getCreatedVertexId() {
- return createdVertexId;
+ public DeletedEdge(){
+ dir = 0;
+ edge = new KmerBytesWritable(kmerSize);
}
- public void setCreatedVertexId(KmerBytesWritable createdVertexId) {
- this.createdVertexId = createdVertexId;
+ public byte getDir() {
+ return dir;
}
- public String getIncomingDir() {
- return incomingDir;
+ public void setDir(byte dir) {
+ this.dir = dir;
}
- public void setIncomingDir(String incomingDir) {
- this.incomingDir = incomingDir;
+ public KmerBytesWritable getEdge() {
+ return edge;
}
- public String getOutgoingDir() {
- return outgoingDir;
- }
-
- public void setOutgoingDir(String outgoingDir) {
- this.outgoingDir = outgoingDir;
- }
-
- public VKmerBytesWritable getIncomingEdge() {
- return incomingEdge;
- }
-
- public void setIncomingEdge(KmerBytesWritable incomingEdge) {
- this.incomingEdge.set(incomingEdge);
- }
-
- public VKmerBytesWritable getOutgoingEdge() {
- return outgoingEdge;
- }
-
- public void setOutgoingEdge(KmerBytesWritable outgoingEdge) {
- this.outgoingEdge.set(outgoingEdge);
+ public void setEdge(KmerBytesWritable edge) {
+ this.edge.set(edge);
}
}
- private String[][] connectedTable = new String[][]{
- {"FF", "RF"},
- {"FF", "RR"},
- {"FR", "RF"},
- {"FR", "RR"}
+ private byte[][] connectedTable = new byte[][]{
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FR},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FR}
};
public static Set<String> existKmerString = new HashSet<String>();
private Set<Long> readIdSet;
private Set<Long> incomingReadIdSet = new HashSet<Long>();
private Set<Long> outgoingReadIdSet = new HashSet<Long>();
private Set<Long> selfReadIdSet = new HashSet<Long>();
- private Set<Long> incomingEdgeIntersection = new HashSet<Long>();
- private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
private VKmerListWritable incomingEdgeList = null;
@@ -106,8 +78,6 @@
private byte outgoingEdgeDir = 0;
protected KmerBytesWritable createdVertexId = null;
- private CreatedVertex createdVertex = new CreatedVertex();
- public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
/**
* initiate kmerSize, maxIteration
@@ -128,7 +98,9 @@
if(outgoingEdgeList == null)
outgoingEdgeList = new VKmerListWritable(kmerSize);
if(createdVertexId == null)
- createdVertexId = new VKmerBytesWritable(kmerSize + 1);
+ createdVertexId = new KmerBytesWritable(kmerSize);//kmerSize + 1
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
/**
@@ -150,7 +122,184 @@
return sb.toString();
}
+ /**
+ * GenerateString only for test
+ */
+ public String generateString(){
+ if(existKmerString.isEmpty()){
+ existKmerString.add("AAA");
+ return "AAA";
+ }
+ else
+ return "GGG";
+ }
+
+ public void generateKmerMap(Iterator<MessageWritable> msgIterator){
+ kmerMap.clear();
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ readIdSet = new HashSet<Long>();
+ for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
+ readIdSet.add(nodeId.getReadId());
+ }
+ kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
+ }
+ }
+
+ public void setSelfReadIdSet(){
+ selfReadIdSet.clear();
+ for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
+ selfReadIdSet.add(nodeId.getReadId());
+ }
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
+ public void createNewVertex(int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
+ //add the corresponding edge to new vertex
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ vertexValue.getRFList().append(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ vertexValue.getRRList().append(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ vertexValue.getFFList().append(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ vertexValue.getFRList().append(outgoingEdge);
+ break;
+ }
+ vertexId.set(createdVertexId);
+ vertex.setVertexId(vertexId);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(vertexId, vertex);
+ }
+
+ public void sendMsgToUpdateEdge(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ outgoingMsg.setCreatedVertexId(createdVertexId);
+ outgoingMsg.setSourceVertexId(getVertexId());
+
+ outgoingMsg.setFlag(incomingEdgeDir);
+ destVertexId.set(incomingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+
+ outgoingMsg.setFlag(outgoingEdgeDir);
+ destVertexId.set(outgoingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ public void storeDeletedEdge(Set<DeletedEdge> deletedEdges, int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ DeletedEdge deletedIncomingEdge = new DeletedEdge();
+ DeletedEdge deletedOutgoingEdge = new DeletedEdge();
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RF);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RR);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FF);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FR);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ }
+ deletedEdges.add(deletedIncomingEdge);
+ deletedEdges.add(deletedOutgoingEdge);
+ }
+ public void deleteEdgeFromOldVertex(DeletedEdge deleteEdge){
+ switch(deleteEdge.dir){
+ case EdgeDir.DIR_RF:
+ getVertexValue().getRFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_RR:
+ getVertexValue().getRRList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FF:
+ getVertexValue().getFFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FR:
+ getVertexValue().getFRList().remove(deleteEdge.getEdge());
+ break;
+ }
+ }
+
+ public void setEdgeListAndEdgeDir(int i){
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ incomingEdgeList.set(getVertexValue().getRFList());
+ incomingEdgeDir = MessageFlag.DIR_RF;
+ break;
+ case EdgeDir.DIR_RR:
+ incomingEdgeList.set(getVertexValue().getRRList());
+ incomingEdgeDir = MessageFlag.DIR_RR;
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ outgoingEdgeList.set(getVertexValue().getFFList());
+ outgoingEdgeDir = MessageFlag.DIR_FF;
+ break;
+ case EdgeDir.DIR_FR:
+ outgoingEdgeList.set(getVertexValue().getFRList());
+ outgoingEdgeDir = MessageFlag.DIR_FR;
+ break;
+ }
+ }
+
+ public void setNeighborEdgeIntersection(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ outgoingReadIdSet.clear();
+ incomingReadIdSet.clear();
+ tmpKmer.set(incomingEdge);
+ incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
+ tmpKmer.set(outgoingEdge);
+ outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
+
+ //set all neighberEdge readId intersection
+ neighborEdgeIntersection.addAll(selfReadIdSet);
+ neighborEdgeIntersection.retainAll(incomingReadIdSet);
+ neighborEdgeIntersection.retainAll(outgoingReadIdSet);
+ }
+
+ public void updateEdgeListPointToNewVertex(){
+ byte meToNeighborDir = incomingMsg.getFlag();
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_FR:
+ getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RF:
+ getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RR:
+ getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -169,161 +318,103 @@
}
voteToHalt();
} else if(getSuperstep() == 3){
- kmerMap.clear();
- createdVertexSet.clear();
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- readIdSet = new HashSet<Long>();
- for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
- readIdSet.add(nodeId.getReadId());
- }
- kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
- }
+ /** generate KmerMap map kmer(key) to readIdSet(value) **/
+ generateKmerMap(msgIterator);
+
+ /** set self readId set **/
+ setSelfReadIdSet();
+
+ int count = 0;
+ //A set storing deleted edges
+ Set<DeletedEdge> deletedEdges = new HashSet<DeletedEdge>();
/** process connectedTable **/
for(int i = 0; i < 4; i++){
- switch(connectedTable[i][0]){
- case "FF":
- outgoingEdgeList.set(getVertexValue().getFFList());
- outgoingEdgeDir = MessageFlag.DIR_FF;
- break;
- case "FR":
- outgoingEdgeList.set(getVertexValue().getFRList());
- outgoingEdgeDir = MessageFlag.DIR_FR;
- break;
- }
- switch(connectedTable[i][1]){
- case "RF":
- incomingEdgeList.set(getVertexValue().getRFList());
- incomingEdgeDir = MessageFlag.DIR_RF;
- break;
- case "RR":
- incomingEdgeList.set(getVertexValue().getRRList());
- incomingEdgeDir = MessageFlag.DIR_RR;
- break;
- }
- selfReadIdSet.clear();
- for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
- selfReadIdSet.add(nodeId.getReadId());
- }
- for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
- for(KmerBytesWritable incomingEdge : incomingEdgeList){
- outgoingReadIdSet.clear();
- incomingReadIdSet.clear();
- outgoingReadIdSet.addAll(kmerMap.get(outgoingEdge));
- incomingReadIdSet.addAll(kmerMap.get(incomingEdge));
-
- //set all neighberEdge readId intersection
- neighborEdgeIntersection.addAll(selfReadIdSet);
- neighborEdgeIntersection.retainAll(outgoingReadIdSet);
- neighborEdgeIntersection.retainAll(incomingReadIdSet);
- //set outgoingEdge readId intersection
- outgoingEdgeIntersection.addAll(selfReadIdSet);
- outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
- outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
- //set incomingEdge readId intersection
- incomingEdgeIntersection.addAll(selfReadIdSet);
- incomingEdgeIntersection.retainAll(incomingReadIdSet);
- incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
+ /** set edgeList and edgeDir based on connectedTable **/
+ setEdgeListAndEdgeDir(i);
+
+ KmerBytesWritable incomingEdge = new KmerBytesWritable(kmerSize);
+ KmerBytesWritable outgoingEdge = new KmerBytesWritable(kmerSize);
+ for(int x = 0; x < incomingEdgeList.getCountOfPosition(); x++){
+ for(int y = 0; y < outgoingEdgeList.getCountOfPosition(); y++){
+ incomingEdge.set(incomingEdgeList.getPosition(x));
+ outgoingEdge.set(outgoingEdgeList.getPosition(y));
+ /** set neighborEdge readId intersection **/
+ setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
if(!neighborEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
+ if(count == 0)
+ createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+ else
+ createdVertexId.setByRead("GGG".getBytes(), 0);
+ count++;
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
- }
-
- if(!incomingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertexSet.add(createdVertex);
+ /** create new/created vertex **/
+ createNewVertex(i, incomingEdge, outgoingEdge);
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- }
-
- if(!outgoingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
+ /** send msg to neighbors to update their edges to new vertex **/
+ sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
+ /** store deleted edge **/
+ storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
}
}
}
+
+// for(KmerBytesWritable incomingEdge : incomingEdgeList){
+// for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
+// /** set neighborEdge readId intersection **/
+// setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
+//
+// if(!neighborEdgeIntersection.isEmpty()){
+// if(count == 0)
+// createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+// else
+// createdVertexId.setByRead("GGG".getBytes(), 0);
+// count++;
+//
+// /** create new/created vertex **/
+// createNewVertex(i, incomingEdge, outgoingEdge);
+//
+// /** send msg to neighbors to update their edges to new vertex **/
+// sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
+//
+// /** store deleted edge **/
+// storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
+// }
+// }
+// }
}
+ /** delete extra edges from old vertex **/
+ for(DeletedEdge deletedEdge : deletedEdges){
+ deleteEdgeFromOldVertex(deletedEdge);
+ }
+
+ /** Old vertex delete or voteToHalt **/
+ if(getVertexValue().getDegree() == 0)//if no any edge, delete
+ deleteVertex(getVertexId());
+ else
+ voteToHalt();
} else if(getSuperstep() == 4){
while(msgIterator.hasNext()){
incomingMsg = msgIterator.next();
/** update edgelist to new/created vertex **/
- byte meToNeighborDir = incomingMsg.getFlag();
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- switch(neighborToMeDir){
- case MessageFlag.DIR_FF:
- getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_FR:
- getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RF:
- getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RR:
- getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
- break;
- }
- /** add new/created vertex **/
- for(CreatedVertex v : createdVertexSet){
- Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
- VertexValueWritable vertexValue = new VertexValueWritable();
- switch(v.incomingDir){
- case "RF":
- vertexValue.getRFList().append(v.incomingEdge);
- break;
- case "RR":
- vertexValue.getRRList().append(v.incomingEdge);
- break;
- }
- switch(v.outgoingDir){
- case "FF":
- vertexValue.getFFList().append(v.outgoingEdge);
- break;
- case "FR":
- vertexValue.getFRList().append(v.outgoingEdge);
- break;
- }
- vertex.setVertexId(v.getCreatedVertexId());
- vertex.setVertexValue(vertexValue);
- }
- createdVertexSet.clear();
+ updateEdgeListPointToNewVertex();
}
+ voteToHalt();
}
}
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(SplitRepeatVertex.class.getSimpleName());
+ job.setVertexClass(SplitRepeatVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 519ffb9..7b695dc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -4,11 +4,10 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -47,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class TipAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "TipAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -58,37 +57,39 @@
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
}
-
+
+ /**
+ * create a new vertex point to split node
+ */
+
@SuppressWarnings("unchecked")
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4){
- getVertexValue().getFFList().append(2, (byte)1);
+ if(getVertexId().toString().equals("CTA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("AGC".getBytes(), 0);
+ getVertexValue().getRFList().append(vertexId);
//add tip vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(2, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'G', 'A', 'A'};
- KmerBytesWritable kmer = new KmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)4));
- vertexValue.setRRList(plist);
+ KmerListWritable kmerList = new KmerListWritable(kmerSize);
+ kmerList.append(getVertexId());
+ vertexValue.setRFList(kmerList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
@@ -106,7 +107,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index b4f2407..c8d3e2d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -10,8 +10,8 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -57,7 +57,14 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if(length == -1)
length = getContext().getConfiguration().getInt(LENGTH, kmerSize); //kmerSize + 5
- outgoingMsg.reset();
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
@Override
@@ -66,35 +73,27 @@
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getFFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- else if(getVertexValue().getFRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+
+ sendSettledMsgToPreviousNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getRFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- else if(getVertexValue().getRRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getPreDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+ sendSettledMsgToNextNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isSingleVertex(getVertexValue())){
- if(getVertexValue().getLengthOfKmer() > length)
+ if(getVertexValue().getLengthOfKmer() <= length)
deleteVertex(getVertexId());
}
}
else if(getSuperstep() == 2){
- responseToDeadVertex(msgIterator);
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
deleted file mode 100644
index 13d9d98..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package edu.uci.ics.genomix.pregelix.sequencefile;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-
-
-public class ConvertNodeToIdValue {
-
- public static void convert(Path inFile, Path outFile)
- throws IOException {
- Configuration conf = new Configuration();
- FileSystem fileSys = FileSystem.get(conf);
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, PositionWritable.class,
- VertexValueWritable.class, CompressionType.NONE);
- NodeWritable node = new NodeWritable();
- NullWritable value = NullWritable.get();
- PositionWritable outputKey = new PositionWritable();
- VertexValueWritable outputValue = new VertexValueWritable();
-
- while(reader.next(node, value)) {
-// System.out.println(node.getNodeID().toString());
-// outputKey.set(node.getNodeID());
- outputValue.setFFList(node.getFFList());
- outputValue.setFRList(node.getFRList());
- outputValue.setRFList(node.getRFList());
- outputValue.setRRList(node.getRRList());
- outputValue.setKmer(node.getKmer());
- outputValue.setState(State.IS_HEAD);
- writer.append(outputKey, outputValue);
- }
- writer.close();
- reader.close();
- }
-
- public static void main(String[] args) throws IOException {
- Path dir = new Path("data/test");
- Path outDir = new Path("data/input");
- FileUtils.cleanDirectory(new File("data/input"));
- Path inFile = new Path(dir, "result.graphbuild.txt.bin");
- Path outFile = new Path(outDir, "out");
- convert(inFile,outFile);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 73ab533..4b32a51 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -3,7 +3,7 @@
public class CheckMessage {
public static final byte SOURCE = 1 << 0;
- public static final byte CHAIN = 1 << 1;
+ public static final byte ACUTUALKMER = 1 << 1;
public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
public static final byte NODEIDLIST = 1 << 4;
@@ -18,8 +18,8 @@
case SOURCE:
r = "SOURCE";
break;
- case CHAIN:
- r = "CHAIN";
+ case ACUTUALKMER:
+ r = "ACUTUALKMER";
break;
case NEIGHBER:
r = "NEIGHBER";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 2ed36e9..51a73f1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -11,5 +11,5 @@
public static final byte DIR_MASK = 0b111 << 0;
public static final byte DIR_CLEAR = 0b1111000 << 0;
- public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
+// public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index b7f6a4f..5a0591d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,6 +1,5 @@
package edu.uci.ics.genomix.pregelix.util;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 630b5aa..ff5f404 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -128,90 +128,90 @@
private static void genSplitRepeatGraph() throws IOException {
generateSplitRepeatGraphJob("SplitRepeatGraph", outputBase + "SplitRepeatGraph.xml");
}
-// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(TipAddVertex.class);
-// job.setVertexInputFormatClass(GraphCleanOutputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genTipAddGraph() throws IOException {
-// generateTipAddGraphJob("TipAddGraph", outputBase
-// + "TipAddGraph.xml");
-// }
-//
-// private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(TipRemoveVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genTipRemoveGraph() throws IOException {
-// generateTipRemoveGraphJob("TipRemoveGraph", outputBase
-// + "TipRemoveGraph.xml");
-// }
-//
-// private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BridgeAddVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBridgeAddGraph() throws IOException {
-// generateBridgeAddGraphJob("BridgeAddGraph", outputBase
-// + "BridgeAddGraph.xml");
-// }
-//
-// private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BridgeRemoveVertex.class);
-// job.setVertexInputFormatClass(GraphCleanInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBridgeRemoveGraph() throws IOException {
-// generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
-// + "BridgeRemoveGraph.xml");
-// }
-//
-// private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BubbleAddVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBubbleAddGraph() throws IOException {
-// generateBubbleAddGraphJob("BubbleAddGraph", outputBase
-// + "BubbleAddGraph.xml");
-// }
+ private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(TipAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genTipAddGraph() throws IOException {
+ generateTipAddGraphJob("TipAddGraph", outputBase
+ + "TipAddGraph.xml");
+ }
+
+ private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(TipRemoveVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genTipRemoveGraph() throws IOException {
+ generateTipRemoveGraphJob("TipRemoveGraph", outputBase
+ + "TipRemoveGraph.xml");
+ }
+
+ private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeAddGraph() throws IOException {
+ generateBridgeAddGraphJob("BridgeAddGraph", outputBase
+ + "BridgeAddGraph.xml");
+ }
+
+ private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeRemoveVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeRemoveGraph() throws IOException {
+ generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
+ + "BridgeRemoveGraph.xml");
+ }
+
+ private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BubbleAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBubbleAddGraph() throws IOException {
+ generateBubbleAddGraphJob("BubbleAddGraph", outputBase
+ + "BubbleAddGraph.xml");
+ }
//
// private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -243,6 +243,11 @@
// genP4ForMergeGraph();
// genMapReduceGraph();
genSplitRepeatGraph();
+ genTipAddGraph();
+ genBridgeAddGraph();
+ genTipRemoveGraph();
+ genBridgeRemoveGraph();
+ genBubbleAddGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
index 221a76a..c4f0963 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
@@ -43,20 +43,9 @@
public class BridgeAddSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BridgeAddSmallTestSuite.class.getName());
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/AddBridge";
public static final String[] TestDir = { PreFix + File.separator
- + "tworeads"};/*, PreFix + File.separator
- /*+ "CyclePath"};, PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "TreePath"};*/
- /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
- PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
- PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
- PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
- PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
- PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
- PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
+ + "SimpleTest"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bridgeadd";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
index 397c514..171a6ca 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
@@ -43,9 +43,9 @@
public class BridgeRemoveSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BridgeRemoveSmallTestSuite.class.getName());
- public static final String PreFix = "data/actual"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/bridgeadd/BridgeAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "pathmerge/P4ForMergeGraph/bin/tworeads"};
+ + "SimpleTest"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bridgeremove";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
new file mode 100644
index 0000000..a9c2774
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class BubbleAddSmallTestSuite extends TestSuite {
+ private static final Logger LOGGER = Logger.getLogger(BubbleAddSmallTestSuite.class.getName());
+
+ public static final String PreFix = "data/PathMergeTestSet";
+ public static final String[] TestDir = { PreFix + File.separator
+ + "5"};
+ private static final String ACTUAL_RESULT_DIR = "data/actual/bubbleadd";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only_bubbleadd.txt";
+
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles()
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ BubbleAddSmallTestSuite testSuite = new BubbleAddSmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
+
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "bin" + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "txt" + File.separator + testDir.getName();
+ testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(), dfs,
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
+
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
+
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
index 21ffe34..1948acd 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
@@ -45,7 +45,7 @@
//P4ForMergeGraph/bin/read
public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "SimpleTest"};
+ + "AdjSplitRepeat"};
private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
@@ -59,7 +59,7 @@
private MiniDFSCluster dfsCluster;
private JobConf conf = new JobConf();
- private int numberOfNC = 2;
+ private int numberOfNC = 1;
public void setUp() throws Exception {
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
index f4456ce..57f4ea5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
@@ -43,20 +43,9 @@
public class TipAddSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(TipAddSmallTestSuite.class.getName());
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/PathMergeTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "read"};/*, PreFix + File.separator
- /*+ "CyclePath"};, PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "TreePath"};*/
- /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
- PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
- PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
- PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
- PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
- PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
- PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
+ + "5"};
private static final String ACTUAL_RESULT_DIR = "data/actual/tipadd";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
index def24e4..e8ca43f 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
@@ -43,10 +43,9 @@
public class TipRemoveSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(TipRemoveSmallTestSuite.class.getName());
//P4ForMergeGraph/bin/read
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/tipadd/TipAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "graphs/tipremove/fr_with_tip"};
- //+ "bridgeadd/BridgeAddGraph/bin/tworeads"};
+ + "5"};
private static final String ACTUAL_RESULT_DIR = "data/actual/tipremove";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";