Merge commit '94e075b5c3db9aa613ef61c2581430a143b17bc8' into nanzhang/hyracks_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
index 663d8dd..7808719 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
@@ -197,5 +197,12 @@
Assert.assertEquals(kmer.toString(), "AGCTGAC");
VKmerBytesWritable reversed = kmerFactory.reverse(kmer);
Assert.assertEquals(reversed.toString(), "CAGTCGA");
+
+ kmer.reset(8);
+ kmer.setByRead(("AATAGAAC").getBytes(), 0);
+ Assert.assertEquals(kmer.toString(), "AATAGAAC");
+ reversed.reset(8);
+ reversed = kmerFactory.reverse(kmer);
+ Assert.assertEquals(reversed.toString(), "GTTCTATT");
}
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 54d29eb..81565a3 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -15,6 +15,11 @@
package edu.uci.ics.genomix.data.test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import junit.framework.Assert;
import org.junit.Test;
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java.orig b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java.orig
new file mode 100644
index 0000000..8a0cb6d
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java.orig
@@ -0,0 +1,427 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.data.test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+
+public class KmerBytesWritableTest {
+ static byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
+ static int k = 7;
+
+ @Test
+ public void TestCompressKmer() {
+ KmerBytesWritable.setGlobalKmerLength(k);
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ kmer.setByRead(array, 0);
+ Assert.assertEquals(kmer.toString(), "AATAGAA");
+
+ kmer.setByRead(array, 1);
+ Assert.assertEquals(kmer.toString(), "ATAGAAG");
+ }
+
+ @Test
+ public void TestMoveKmer() {
+ KmerBytesWritable.setGlobalKmerLength(k);
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ kmer.setByRead(array, 0);
+ Assert.assertEquals(kmer.toString(), "AATAGAA");
+
+ for (int i = k; i < array.length - 1; i++) {
+ kmer.shiftKmerWithNextCode(array[i]);
+ Assert.assertTrue(false);
+ }
+
+ byte out = kmer.shiftKmerWithNextChar(array[array.length - 1]);
+ Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
+ Assert.assertEquals(kmer.toString(), "ATAGAAG");
+ }
+
+ @Test
+ public void TestCompressKmerReverse() {
+ KmerBytesWritable.setGlobalKmerLength(k);
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ kmer.setByRead(array, 0);
+ Assert.assertEquals(kmer.toString(), "AATAGAA");
+
+ kmer.setByReadReverse(array, 1);
+ Assert.assertEquals(kmer.toString(), "CTTCTAT");
+ }
+
+ @Test
+ public void TestMoveKmerReverse() {
+ KmerBytesWritable.setGlobalKmerLength(k);
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ kmer.setByRead(array, 0);
+ Assert.assertEquals(kmer.toString(), "AATAGAA");
+
+ for (int i = k; i < array.length - 1; i++) {
+ kmer.shiftKmerWithPreChar(array[i]);
+ Assert.assertTrue(false);
+ }
+
+ byte out = kmer.shiftKmerWithPreChar(array[array.length - 1]);
+ Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
+ Assert.assertEquals(kmer.toString(), "GAATAGA");
+ }
+
+ @Test
+ public void TestGetGene() {
+ KmerBytesWritable.setGlobalKmerLength(9);
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ String text = "AGCTGACCG";
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G' };
+ kmer.setByRead(array, 0);
+
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(text.charAt(i), (char) (GeneCode.getSymbolFromCode(kmer.getGeneCodeAtPosition(i))));
+ }
+ }
+
+ @Test
+ public void TestGetOneByteFromKmer() {
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ String string = "AGCTGACCGT";
+ for (int k = 3; k <= 10; k++) {
+ KmerBytesWritable.setGlobalKmerLength(k);
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ KmerBytesWritable kmerAppend = new KmerBytesWritable();
+ kmer.setByRead(array, 0);
+ Assert.assertEquals(string.substring(0, k), kmer.toString());
+ for (int b = 0; b < k; b++) {
+ byte byteActual = KmerBytesWritable.getOneByteFromKmerAtPosition(b, kmer.getBytes(), kmer.getOffset(),
+ kmer.getLength());
+ byte byteExpect = GeneCode.getCodeFromSymbol(array[b]);
+ for (int i = 1; i < 4 && b + i < k; i++) {
+ byteExpect += GeneCode.getCodeFromSymbol(array[b + i]) << (i * 2);
+ }
+ Assert.assertEquals(byteActual, byteExpect);
+ KmerBytesWritable.appendOneByteAtPosition(b, byteActual, kmerAppend.getBytes(), kmerAppend.getOffset(),
+ kmerAppend.getLength());
+ }
+ Assert.assertEquals(kmer.toString(), kmerAppend.toString());
+ }
+ }
+<<<<<<< HEAD
+=======
+
+ @Test
+ public void TestMergeFFKmer() {
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ String text = "AGCTGACCGT";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(8);
+ kmer1.setByRead(array, 0);
+ String text1 = "AGCTGACC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(8);
+ kmer2.setByRead(array, 1);
+ String text2 = "GCTGACCG";
+ Assert.assertEquals(text2, kmer2.toString());
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ int kmerSize = 8;
+ merge.mergeWithFFKmer(kmerSize, kmer2);
+ Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
+
+ for (int i = 1; i < 8; i++) {
+ merge.set(kmer1);
+ merge.mergeWithFFKmer(i, kmer2);
+ Assert.assertEquals(text1 + text2.substring(i - 1), merge.toString());
+ }
+
+ for (int ik = 1; ik <= 10; ik++) {
+ for (int jk = 1; jk <= 10; jk++) {
+ kmer1 = new KmerBytesWritable(ik);
+ kmer2 = new KmerBytesWritable(jk);
+ kmer1.setByRead(array, 0);
+ kmer2.setByRead(array, 0);
+ text1 = text.substring(0, ik);
+ text2 = text.substring(0, jk);
+ Assert.assertEquals(text1, kmer1.toString());
+ Assert.assertEquals(text2, kmer2.toString());
+ for (int x = 1; x < jk; x++) {
+ merge.set(kmer1);
+ merge.mergeWithFFKmer(x, kmer2);
+ Assert.assertEquals(text1 + text2.substring(x - 1), merge.toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void TestMergeFRKmer() {
+ int kmerSize = 3;
+ String result = "AAGCTAACAACC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AAGCTAA";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 0);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "GGTTGTT";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, result.length() - text2.length());
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithFRKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAAACAACC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAACAACC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAACAACC", merge.toString());
+ }
+
+
+ @Test
+ public void TestMergeRFKmer() {
+ int kmerSize = 3;
+ String result = "GGCACAACAACCC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AACAACCC";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 5);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "TTGTGCC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, 0);
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithRFKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAAACAACCC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAACAACCC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAACAACCC", merge.toString());
+
+ String test1;
+ String test2;
+ test1 = "CTA";
+ test2 = "AGA";
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("TCTA", k1.toString());
+
+ test1 = "CTA";
+ test2 = "ATA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("CTAT", k1.toString());
+
+ test1 = "ATA";
+ test2 = "CTA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("ATAG", k1.toString());
+
+ test1 = "TCTAT";
+ test2 = "GAAC";
+ k1 = new KmerBytesWritable(5);
+ k2 = new KmerBytesWritable(4);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("GTTCTAT", k1.toString());
+ }
+
+
+
+ @Test
+ public void TestMergeRRKmer() {
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ String text = "AGCTGACCGT";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(8);
+ kmer1.setByRead(array, 0);
+ String text1 = "AGCTGACC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(8);
+ kmer2.setByRead(array, 1);
+ String text2 = "GCTGACCG";
+ Assert.assertEquals(text2, kmer2.toString());
+ KmerBytesWritable merge = new KmerBytesWritable(kmer2);
+ int kmerSize = 8;
+ merge.mergeWithRRKmer(kmerSize, kmer1);
+ Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
+
+ for (int i = 1; i < 8; i++) {
+ merge.set(kmer2);
+ merge.mergeWithRRKmer(i, kmer1);
+ Assert.assertEquals(text1.substring(0, text1.length() - i + 1) + text2, merge.toString());
+ }
+
+ for (int ik = 1; ik <= 10; ik++) {
+ for (int jk = 1; jk <= 10; jk++) {
+ kmer1 = new KmerBytesWritable(ik);
+ kmer2 = new KmerBytesWritable(jk);
+ kmer1.setByRead(array, 0);
+ kmer2.setByRead(array, 0);
+ text1 = text.substring(0, ik);
+ text2 = text.substring(0, jk);
+ Assert.assertEquals(text1, kmer1.toString());
+ Assert.assertEquals(text2, kmer2.toString());
+ for (int x = 1; x < ik; x++) {
+ merge.set(kmer2);
+ merge.mergeWithRRKmer(x, kmer1);
+ Assert.assertEquals(text1.substring(0, text1.length() - x + 1) + text2, merge.toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void TestFinalMerge() {
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ int kmerSize = 3;
+
+ String F1 = "AATAG";
+ String F2 = "TAGAA";
+ String R1 = "CTATT";
+ String R2 = "TTCTA";
+
+ //FF test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = F2;
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //FR test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(R2);
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RF test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(F2);
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RR test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = R2;
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ String[][] connectedTable = new String[][]{
+ {"FF", "RF"},
+ {"FF", "RR"},
+ {"FR", "RF"},
+ {"FR", "RR"}
+ };
+ System.out.println(connectedTable[0][1]);
+
+ Set<Long> s1 = new HashSet<Long>();
+ Set<Long> s2 = new HashSet<Long>();
+ s1.add((long) 1);
+ s1.add((long) 2);
+ s2.add((long) 2);
+ s2.add((long) 3);
+ Set<Long> intersection = new HashSet<Long>();
+ intersection.addAll(s1);
+ intersection.retainAll(s2);
+ System.out.println(intersection.toString());
+ Set<Long> difference = new HashSet<Long>();
+ difference.addAll(s1);
+ difference.removeAll(s2);
+ System.out.println(difference.toString());
+
+ Map<KmerBytesWritable, Set<Long>> map = new HashMap<KmerBytesWritable, Set<Long>>();
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ Set<Long> set1 = new HashSet<Long>();
+ k1.setByRead(("CTA").getBytes(), 0);
+ set1.add((long)1);
+ map.put(k1, set1);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k2.setByRead(("GTA").getBytes(), 0);
+ Set<Long> set2 = new HashSet<Long>();
+ set2.add((long) 2);
+ map.put(k2, set2);
+ KmerBytesWritable k3 = new KmerBytesWritable(3);
+ k3.setByRead(("ATG").getBytes(), 0);
+ Set<Long> set3 = new HashSet<Long>();
+ set3.add((long) 3);
+ map.put(k3, set3);
+ KmerBytesWritable k4 = new KmerBytesWritable(3);
+ k4.setByRead(("AAT").getBytes(), 0);
+ Set<Long> set4 = new HashSet<Long>();
+ set4.add((long) 4);
+ map.put(k4, set4);
+ System.out.println("CTA = " + map.get(k1).toString());
+ System.out.println("GTA = " + map.get(k2).toString());
+ System.out.println("ATG = " + map.get(k3).toString());
+ System.out.println("AAT = " + map.get(k4).toString());
+ }
+>>>>>>> 94e075b5c3db9aa613ef61c2581430a143b17bc8
+}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
index 5a69a3c..2f7bba8 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
@@ -22,7 +22,7 @@
for (int i = 1; i < 200; i++) {
KmerBytesWritable.setGlobalKmerLength(i);
kmer = new KmerBytesWritable();
- String randomString = generateString(i);
+ String randomString = generaterRandomString(i);
byte[] array = randomString.getBytes();
kmer.setByRead(array, 0);
kmerList.reset();
@@ -36,7 +36,7 @@
//add one more kmer each time and fix kmerSize
for (int i = 0; i < 200; i++) {
kmer = new KmerBytesWritable();
- String randomString = generateString(5);
+ String randomString = generaterRandomString(5);
byte[] array = randomString.getBytes();
kmer.setByRead(array, 0);
kmerList.append(kmer);
@@ -63,7 +63,7 @@
for (i = 0; i < 200; i++) {
KmerBytesWritable.setGlobalKmerLength(5);
kmer = new KmerBytesWritable();
- String randomString = generateString(5);
+ String randomString = generaterRandomString(5);
byte[] array = randomString.getBytes();
kmer.setByRead(array, 0);
kmerList.append(kmer);
@@ -106,9 +106,20 @@
}
Assert.assertEquals(0, kmerList.getCountOfPosition());
+
+ KmerBytesWritable.setGlobalKmerLength(3);
+ KmerListWritable edgeList = new KmerListWritable();
+ KmerBytesWritable k = new KmerBytesWritable();
+ k.setByRead(("AAA").getBytes(), 0);
+ edgeList.append(k);
+ k.setByRead(("CCC").getBytes(), 0);
+ edgeList.append(k);
+ for(KmerBytesWritable edge : edgeList){
+ System.out.println(edge.toString());
+ }
}
- public String generateString(int n){
+ public String generaterRandomString(int n){
char[] chars = "ACGT".toCharArray();
StringBuilder sb = new StringBuilder();
Random random = new Random();
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
index fc67245..003406d 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
@@ -42,7 +42,6 @@
Assert.assertEquals(pos.getPosId(), posId);
Assert.assertEquals(pos1.toString(), pos.toString());
- String out = pos.toString();
}
}
}
diff --git a/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt
new file mode 100644
index 0000000..bb03d70
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CATAC
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 217e882..127ab3e 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,13 +22,13 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/pathmerge_TestSet/9";
+ private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
// private static final int COUNT_REDUCER = 2;
private static final int SIZE_KMER = 3;
- private static final int READ_LENGTH = 11;
+ private static final int READ_LENGTH = 5;
private MiniDFSCluster dfsCluster;
private MiniMRCluster mrCluster;
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.2.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.2.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.2.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.3.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.3.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.3.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.4.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.4.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.4.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.5.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.5.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.5.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.6.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.6.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.6.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.7.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.7.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.7.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.8.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.8.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.8.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.9.crc b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.9.crc
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/.9.crc
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/2 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/2
new file mode 100755
index 0000000..f8bec75
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/2
@@ -0,0 +1,2 @@
+ATA {[(1-2_1)] [] [] [] [] AATA}
+AAT {[(1-1_1)] [] [] [] [] AATA}
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/3 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/3
new file mode 100755
index 0000000..0f3d880
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/3
@@ -0,0 +1 @@
+ATA {[(1-2_1)] [] [] [] [] AATAG}
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/4 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/4
new file mode 100755
index 0000000..91211fc
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/4
@@ -0,0 +1,2 @@
+CTA {[(1-3_1)] [] [] [] [] TCTATT}
+ATA {[(1-2_1)] [] [] [] [] AATAGA}
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/5 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/5
new file mode 100755
index 0000000..a2ae06f
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/5
@@ -0,0 +1,2 @@
+ATA {[(1-2_1)] [] [] [] [] AATAGAA}
+AGA {[(1-4_1)] [] [] [] [] AATAGAA}
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/6 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/6
new file mode 100755
index 0000000..c084bee
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/6
@@ -0,0 +1,2 @@
+CTA {[(1-3_1)] [] [] [] [] GTTCTATT}
+AGA {[(1-4_1)] [] [] [] [] AATAGAAC}
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/7 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/7
new file mode 100755
index 0000000..30b31af
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/7
@@ -0,0 +1 @@
+AGA {[(1-4_1)] [] [] [] [] AATAGAACT}
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/8 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/8
new file mode 100755
index 0000000..900707b
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/8
@@ -0,0 +1,2 @@
+GAA {[(1-5_1)] [] [] [] [] AATAGAACTT}
+AGA {[(1-4_1)] [] [] [] [] AATAGAACTT}
diff --git a/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/9 b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/9
new file mode 100755
index 0000000..862bc7c4
--- /dev/null
+++ b/genomix/genomix-pregelix/data/LogAlgorithmForMergeGraph/txt/9
@@ -0,0 +1,2 @@
+AGA {[(1-4_1)] [] [] [] [] AATAGAACTTA}
+AAC {[(1-6_1)] [] [] [] [] AATAGAACTTA}
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000
new file mode 100755
index 0000000..4977247
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 918fa1e..0da4a77 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -9,8 +9,8 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
@@ -34,7 +34,7 @@
@Option(name = "-plan", usage = "query plan choice", required = false)
public Plan planChoice = Plan.OUTER_JOIN;
- @Option(name = "-kmer-kmerByteSize", usage = "the kmerByteSize of kmer", required = false)
+ @Option(name = "-tmpKmer-kmerByteSize", usage = "the kmerByteSize of tmpKmer", required = false)
public int sizeKmer;
@Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
@@ -66,12 +66,12 @@
for (int i = 1; i < inputs.length; i++)
FileInputFormat.addInputPaths(job, inputs[0]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
- job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(P1ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(P2ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
if (options.numIteration > 0) {
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
- job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setInt(P1ForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setInt(P2ForPathMergeVertex.ITERATIONS, options.numIteration);
job.getConfiguration().setInt(P3ForPathMergeVertex.ITERATIONS, options.numIteration);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
index 0308913..3f8216c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
@@ -9,6 +9,7 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -39,6 +40,7 @@
byte selfFlag = (byte)(vertex.getVertexValue().getState() & State.VERTEX_MASK);
if(selfFlag == State.IS_FINAL)
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ P2ForPathMergeVertex.fakeVertexExist = false;
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index d9177e2..e3cd345 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -9,6 +9,8 @@
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class MessageWritable implements WritableComparable<MessageWritable> {
/**
@@ -17,8 +19,9 @@
* file stores the point to the file that stores the chains of connected DNA
*/
private KmerBytesWritable sourceVertexId;
- private KmerBytesWritable kmer;
+ private VKmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
+ private PositionListWritable nodeIdList = new PositionListWritable();
private byte flag;
private boolean isFlip;
private int kmerlength = 0;
@@ -28,7 +31,7 @@
public MessageWritable() {
sourceVertexId = new KmerBytesWritable();
- kmer = new KmerBytesWritable();
+ kmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable();
flag = Message.NON;
isFlip = false;
@@ -38,7 +41,7 @@
public MessageWritable(int kmerSize) {
kmerlength = kmerSize;
sourceVertexId = new KmerBytesWritable();
- kmer = new KmerBytesWritable();
+ kmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable(kmerSize);
flag = Message.NON;
isFlip = false;
@@ -54,7 +57,7 @@
}
if (kmer != null) {
checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(msg.getKmer());
+ this.kmer.setAsCopy(msg.getActualKmer());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -74,7 +77,7 @@
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(chainVertexId);
+ this.kmer.setAsCopy(new VKmerBytesWritable(chainVertexId.toString())); // TODO Vkmer
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -107,14 +110,25 @@
}
}
- public KmerBytesWritable getKmer() {
+ public VKmerBytesWritable getActualKmer() {
return kmer;
}
- public void setChainVertexId(KmerBytesWritable chainVertexId) {
- if (chainVertexId != null) {
+ public void setActualKmer(VKmerBytesWritable actualKmer) {
+ if (actualKmer != null) {
checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(chainVertexId);
+ this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
+ }
+ }
+
+ public VKmerBytesWritable getCreatedVertexId() {
+ return kmer;
+ }
+
+ public void setCreatedVertexId(KmerBytesWritable actualKmer) {
+ if (actualKmer != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
}
}
@@ -130,7 +144,7 @@
}
public int getLengthOfChain() {
- return kmer.getKmerLength();
+ return kmer.getKmerLetterLength();
}
public byte getFlag() {
@@ -158,6 +172,17 @@
this.updateMsg = updateMsg;
}
+ public PositionListWritable getNodeIdList() {
+ return nodeIdList;
+ }
+
+ public void setNodeIdList(PositionListWritable nodeIdList) {
+ if(nodeIdList != null){
+ checkMessage |= CheckMessage.NODEIDLIST;
+ this.nodeIdList.set(nodeIdList);
+ }
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(kmerlength);
@@ -168,6 +193,8 @@
kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
+ if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
+ nodeIdList.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
out.writeBoolean(updateMsg);
@@ -184,6 +211,8 @@
kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
+ if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
+ nodeIdList.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
updateMsg = in.readBoolean();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 60ad003..9cdac8f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -12,10 +12,23 @@
public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
- public static class VertexStateFlag {
+ public static class State extends VertexStateFlag{
+ public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b101 << 0;
+ public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b111 << 0;
+
+ public static final byte NO_MERGE = 0b00 << 3;
+ public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
+ public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
+ public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
+ public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
+
+ public static final byte KILL = 0b11 << 3;
+ public static final byte KILL_MASK = 0b11 << 3;
+ }
+
+ public static class VertexStateFlag extends FakeFlag {
public static final byte IS_NON = 0b00 << 5;
public static final byte IS_RANDOMTAIL = 0b00 << 5;
- public static final byte IS_STOP = 0b00 << 5;
public static final byte IS_HEAD = 0b01 << 5;
public static final byte IS_FINAL = 0b10 << 5;
public static final byte IS_RANDOMHEAD = 0b11 << 5;
@@ -25,15 +38,11 @@
public static final byte VERTEX_CLEAR = (byte) 11001111;
}
- public static class State extends VertexStateFlag{
- public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b101 << 0;
- public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b111 << 0;
-
- public static final byte NO_MERGE = 0b00 << 3;
- public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
- public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
- public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
- public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
+ public static class FakeFlag{
+ public static final byte IS_NONFAKE = 0 << 0;
+ public static final byte IS_FAKE = 1 << 0;
+
+ public static final byte FAKEFLAG_MASK = (byte) 00000001;
}
private PositionListWritable nodeIdList;
@@ -41,8 +50,8 @@
private AdjacencyListWritable outgoingList;
private byte state;
private VKmerBytesWritable kmer;
- private VKmerBytesWritable mergeDest;
private int kmerlength = 0;
+ private boolean isFakeVertex = false;
public VertexValueWritable() {
this(0);
@@ -55,7 +64,6 @@
outgoingList = new AdjacencyListWritable();
state = State.IS_NON;
kmer = new VKmerBytesWritable();
- mergeDest = new VKmerBytesWritable();
}
public VertexValueWritable(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
@@ -144,6 +152,15 @@
public byte getState() {
return state;
}
+
+
+ public boolean isFakeVertex() {
+ return isFakeVertex;
+ }
+
+ public void setFakeVertex(boolean isFakeVertex) {
+ this.isFakeVertex = isFakeVertex;
+ }
public void setState(byte state) {
this.state = state;
@@ -160,16 +177,7 @@
public void setKmer(VKmerBytesWritable kmer) {
this.kmer.setAsCopy(kmer);
}
-
- public VKmerBytesWritable getMergeDest() {
- return mergeDest;
- }
- public void setMergeDest(VKmerBytesWritable mergeDest) {
- this.mergeDest = mergeDest;
- }
-
-
public int getKmerlength() {
return kmerlength;
}
@@ -197,7 +205,7 @@
this.outgoingList.readFields(in);
this.state = in.readByte();
this.kmer.readFields(in);
- this.mergeDest.readFields(in);
+ this.isFakeVertex = in.readBoolean();
}
@Override
@@ -208,7 +216,7 @@
this.outgoingList.write(out);
out.writeByte(this.state);
this.kmer.write(out);
- this.mergeDest.write(out);
+ out.writeBoolean(this.isFakeVertex);
}
@Override
@@ -237,6 +245,10 @@
return outgoingList.getForwardList().getCountOfPosition() + outgoingList.getReverseList().getCountOfPosition();
}
+ public int getDegree(){
+ return inDegree() + outDegree();
+ }
+
/*
* Delete the corresponding edge
*/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 7d6a1b9..dd78cde 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -78,7 +78,7 @@
builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getFlag()) + "\r\n");
if (msg.getLengthOfChain() != -1) {
- chain = msg.getKmer().toString();
+ chain = msg.getActualKmer().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index fa353d0..89b66e6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -3,7 +3,7 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -14,6 +14,7 @@
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -87,7 +88,7 @@
* set the vertex value
*/
byte[] array = { 'T', 'A', 'G', 'C', 'C'};
- KmerBytesWritable kmer = new KmerBytesWritable(array.length);
+ VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
kmer.setByRead(array, 0);
vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 9db9418..7f85fc7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -10,7 +10,7 @@
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -46,7 +46,7 @@
* Naive Algorithm for path merge graph
*/
public class BridgeRemoveVertex extends
- BasicPathMergeVertex {
+ BasicGraphCleanVertex {
public static final String LENGTH = "BridgeRemoveVertex.length";
private int length = -1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index ebb4f74..c0ba1a9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -4,6 +4,7 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -83,7 +84,7 @@
* set the vertex value
*/
byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
- KmerBytesWritable kmer = new KmerBytesWritable(array.length);
+ VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
kmer.setByRead(array, 0);
vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 40e3191..b782294 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -150,17 +150,24 @@
public void broadcaseKillself(){
outgoingMsg.setSourceVertexId(getVertexId());
- if(getVertexValue().getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
outgoingMsg.setMessage(AdjMessage.FROMFF);
- else // #FRList() > 0
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ }
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
outgoingMsg.setMessage(AdjMessage.FROMFR);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ }
- if(getVertexValue().getRFList().getCountOfPosition() > 0) // #RFList() > 0
+
+ if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
outgoingMsg.setMessage(AdjMessage.FROMRF);
- else // #RRList() > 0
+ sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
+ }
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
outgoingMsg.setMessage(AdjMessage.FROMRR);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
+ sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
+ }
deleteVertex(getVertexId());
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
similarity index 84%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 1beee03..64965e3 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -14,23 +14,24 @@
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+//import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/**
* Naive Algorithm for path merge graph
*/
-public class BasicPathMergeVertex extends
- Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "BasicPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "BasicPathMergeVertex.iteration";
+public class BasicGraphCleanVertex extends
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BasicGraphCleanVertex.kmerSize";
+ public static final String ITERATIONS = "BasicGraphCleanVertex.iteration";
public static int kmerSize = -1;
protected int maxIteration = -1;
protected MessageWritable incomingMsg = null;
protected MessageWritable outgoingMsg = null;
- protected KmerBytesWritable destVertexId = new KmerBytesWritable();
- protected Iterator<KmerBytesWritable> posIterator;
- private KmerBytesWritable kmer = new KmerBytesWritable();
+ protected VKmerBytesWritable destVertexId = new VKmerBytesWritable();
+ protected Iterator<VKmerBytesWritable> posIterator;
+ protected VKmerBytesWritable tmpKmer = new VKmerBytesWritable();
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -67,7 +68,7 @@
/**
* get destination vertex
*/
- public KmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
return posIterator.next();
@@ -79,7 +80,7 @@
}
}
- public KmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
return posIterator.next();
@@ -163,23 +164,25 @@
* one vertex send message to previous and next vertices (neighbor)
*/
public void sendMsgToAllNeighborNodes(VertexValueWritable value){
- posIterator = value.getFFList().iterator(); // FFList
+ sendMsgToAllNextNodes(value);
+ sendMsgToAllPreviousNodes(value);
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
+ outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getFRList().iterator(); // FRList
+ posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
+ outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
@@ -204,26 +207,6 @@
sendMsg(destVertexId, outgoingMsg);
}
}
-
- /**
- * head send message to all previous nodes
- */
- public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
/**
* start sending message
@@ -355,7 +338,8 @@
public void sendUpdateMsg(){
outgoingMsg.setUpdateMsg(true);
byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- switch(meToNeighborDir){
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
case MessageFlag.DIR_FF:
case MessageFlag.DIR_FR:
sendUpdateMsgToPredecessor();
@@ -397,7 +381,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
@@ -410,12 +394,10 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
-// if(headBecomeOldHead)
-// getVertexValue().processDelete(neighborToMeDir, incomingMsg.getSourceVertexId());
}
/**
@@ -437,7 +419,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
@@ -450,7 +432,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
@@ -473,7 +455,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
@@ -486,7 +468,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
@@ -610,7 +592,7 @@
getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getKmer());
+ kmerSize, incomingMsg.getActualKmer());
}
/**
@@ -624,7 +606,7 @@
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
- kmerSize, msg.getKmer());
+ kmerSize, msg.getActualKmer());
}
/**
@@ -644,41 +626,40 @@
case MessageFlag.DIR_FF:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
- msgString = msg.getKmer().toString();
+ msgString = msg.getActualKmer().toString();
index = msgString.indexOf(match);
-// kmer.reset(msgString.length() - index);
- kmer.setByRead(msgString.substring(index).getBytes(), 0);
+// tmpKmer.reset(msgString.length() - index);
+ tmpKmer.setByRead(msgString.substring(index).getBytes(), 0);
break;
case MessageFlag.DIR_FR:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
- msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ msgString = GeneCode.reverseComplement(msg.getActualKmer().toString());
index = msgString.indexOf(match);
-// kmer.reset(msgString.length() - index);
- kmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
+// tmpKmer.reset(msgString.length() - index);
+ tmpKmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
break;
case MessageFlag.DIR_RF:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(0,kmerSize - 1);
- msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ msgString = GeneCode.reverseComplement(msg.getActualKmer().toString());
index = msgString.lastIndexOf(match) + kmerSize - 2;
-// kmer.reset(index + 1);
- kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+// tmpKmer.reset(index + 1);
+ tmpKmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
break;
case MessageFlag.DIR_RR:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(0,kmerSize - 1);
- msgString = msg.getKmer().toString();
+ msgString = msg.getActualKmer().toString();
index = msgString.lastIndexOf(match) + kmerSize - 2;
-// kmer.reset(index + 1);
- kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
- System.out.println(kmer.toString());
+// tmpKmer.reset(index + 1); // TODO: fix ALL of these resets (only if you need to)
+ tmpKmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
break;
}
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
- kmerSize, kmer);
+ kmerSize, tmpKmer);
}
/**
@@ -726,55 +707,97 @@
}
/**
+ * broadcast kill self to all neighbers Pre-condition: vertex is a path vertex
+ */
+ public void broadcaseKillself(){
+ outFlag = 0;
+ outFlag |= MessageFlag.KILL;
+ outFlag |= MessageFlag.DIR_FROM_DEADVERTEX;
+ outgoingMsg.setSourceVertexId(getVertexId());
+
+ if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
+ outFlag |= MessageFlag.DIR_FF;
+ outgoingMsg.setFlag(outFlag);
+ sendMsg(getVertexValue().getFFList().getPosition(0), outgoingMsg);
+ }
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
+ outFlag |= MessageFlag.DIR_FR;
+ outgoingMsg.setFlag(outFlag);
+ sendMsg(getVertexValue().getFRList().getPosition(0), outgoingMsg);
+ }
+
+
+ if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
+ outFlag |= MessageFlag.DIR_RF;
+ outgoingMsg.setFlag(outFlag);
+ sendMsg(getVertexValue().getRFList().getPosition(0), outgoingMsg);
+ }
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
+ outFlag |= MessageFlag.DIR_RR;
+ outgoingMsg.setFlag(outFlag);
+ sendMsg(getVertexValue().getRRList().getPosition(0), outgoingMsg);
+ }
+
+ deleteVertex(getVertexId());
+ }
+
+ /**
* do some remove operations on adjMap after receiving the info about dead Vertex
*/
- public void responseToDeadVertex(Iterator<MessageWritable> msgIterator){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if(incomingMsg.getFlag() == AdjMessage.FROMFF){
+ public void responseToDeadVertex(){
+ switch(incomingMsg.getFlag() & MessageFlag.DIR_MASK){
+ case MessageFlag.DIR_FF:
//remove incomingMsg.getSourceId from RR positionList
posIterator = getVertexValue().getRRList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
}
- } else if(incomingMsg.getFlag() == AdjMessage.FROMFR){
+ break;
+ case MessageFlag.DIR_FR:
//remove incomingMsg.getSourceId from FR positionList
posIterator = getVertexValue().getFRList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
}
- } else if(incomingMsg.getFlag() == AdjMessage.FROMRF){
+ break;
+ case MessageFlag.DIR_RF:
//remove incomingMsg.getSourceId from RF positionList
posIterator = getVertexValue().getRFList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
}
- } else{ //incomingMsg.getFlag() == AdjMessage.FROMRR
+ break;
+ case MessageFlag.DIR_RR:
//remove incomingMsg.getSourceId from FF positionList
posIterator = getVertexValue().getFFList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
}
- }
+ break;
}
}
+ public boolean isKillMsg(){
+ byte killFlag = (byte) (incomingMsg.getFlag() & MessageFlag.KILL_MASK);
+ return killFlag == MessageFlag.KILL;
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
new file mode 100644
index 0000000..ecd3c4f
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -0,0 +1,194 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+//import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+public class MapReduceVertex extends
+ BasicGraphCleanVertex {
+
+ public static boolean fakeVertexExist = false;
+ protected static VKmerBytesWritable fakeVertex = null;
+
+ protected VKmerBytesWritable reverseKmer;
+ protected KmerListWritable kmerList = null;
+ protected Map<VKmerBytesWritable, KmerListWritable> kmerMapper = new HashMap<VKmerBytesWritable, KmerListWritable>();
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(reverseKmer == null)
+ reverseKmer = new VKmerBytesWritable();
+ if(kmerList == null)
+ kmerList = new KmerListWritable();
+ else
+ kmerList.reset();
+ if(fakeVertex == null){
+// fakeVertex = new KmerBytesWritable(kmerSize + 1); // TODO check if merge is correct
+ fakeVertex = new KmerBytesWritable();
+ String random = generaterRandomString(kmerSize + 1);
+ fakeVertex.setByRead(random.getBytes(), 0);
+ }
+ }
+
+ /**
+ * Generate random string from [ACGT]
+ */
+ public String generaterRandomString(int n){
+ char[] chars = "ACGT".toCharArray();
+ StringBuilder sb = new StringBuilder();
+ Random random = new Random();
+ for (int i = 0; i < n; i++) {
+ char c = chars[random.nextInt(chars.length)];
+ sb.append(c);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * add fake vertex
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void addFakeVertex(){
+ if(!fakeVertexExist){
+ //add a fake vertex
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize + 1);
+ vertexValue.setState(State.IS_FAKE);
+ vertexValue.setFakeVertex(true);
+
+ vertex.setVertexId(fakeVertex);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(fakeVertex, vertex);
+ fakeVertexExist = true;
+ }
+ }
+
+ public void sendMsgToFakeVertex(){
+ if(!getVertexValue().isFakeVertex()){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ sendMsg(fakeVertex, outgoingMsg);
+ voteToHalt();
+ }
+ }
+
+ public void mapKeyByActualKmer(Iterator<MessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ String kmerString = incomingMsg.getActualKmer().toString();
+ tmpKmer.reset(kmerString.length());
+// reverseKmer.reset(kmerString.length());//kmerbyteswritable
+ tmpKmer.setByRead(kmerString.getBytes(), 0);
+ reverseKmer.setByReadReverse(kmerString.getBytes(), 0);
+
+ if(reverseKmer.compareTo(tmpKmer) < 0)
+ tmpKmer.setAsCopy(reverseKmer);
+ if(!kmerMapper.containsKey(tmpKmer)){
+ kmerList.reset();
+ kmerList.append(incomingMsg.getSourceVertexId());
+ kmerMapper.put(tmpKmer, kmerList);
+ } else{
+ kmerList.setCopy(kmerMapper.get(tmpKmer));
+ kmerList.append(incomingMsg.getSourceVertexId());
+ kmerMapper.put(tmpKmer, kmerList);
+ }
+ }
+ }
+
+ public void reduceKeyByActualKmer(){
+ for(VKmerBytesWritable key : kmerMapper.keySet()){
+ kmerList = kmerMapper.get(key);
+ for(int i = 1; i < kmerList.getCountOfPosition(); i++){
+ //send kill message
+ outgoingMsg.setFlag(MessageFlag.KILL);
+ destVertexId.setAsCopy(kmerList.getPosition(i));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ }
+
+ public void finalVertexResponseToFakeVertex(Iterator<MessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ inFlag = incomingMsg.getFlag();
+ if(inFlag == MessageFlag.KILL){
+ broadcaseKillself();
+ }
+ }
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if(getSuperstep() == 1){
+ addFakeVertex();
+ }
+ else if(getSuperstep() == 2){
+ /** NON-FAKE and Final vertice send msg to FAKE vertex **/
+ sendMsgToFakeVertex();
+ } else if(getSuperstep() == 3){
+ kmerMapper.clear();
+ /** Mapper **/
+ mapKeyByActualKmer(msgIterator);
+ /** Reducer **/
+ reduceKeyByActualKmer();
+ } else if(getSuperstep() == 4){
+ /** only for test single MapReduce job**/
+ if(!msgIterator.hasNext() && getVertexValue().getState() == State.IS_FAKE){
+ fakeVertexExist = false;
+ deleteVertex(fakeVertex);
+ }
+ finalVertexResponseToFakeVertex(msgIterator);
+ } else if(getSuperstep() == 5){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(isKillMsg())
+ responseToDeadVertex();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(MapReduceVertex.class.getSimpleName());
+ job.setVertexClass(MapReduceVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
similarity index 93%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 53e46af..3447f25 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -3,7 +3,7 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -49,10 +49,10 @@
/**
* Naive Algorithm for path merge graph
*/
-public class NaiveAlgorithmForPathMergeVertex extends
+public class P1ForPathMergeVertex extends
Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static final String KMER_SIZE = "P1ForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "P1ForPathMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
@@ -60,7 +60,7 @@
private MessageWritable outgoingMsg = new MessageWritable();
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private KmerBytesWritable lastKmer = new KmerBytesWritable();
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable();
private PositionWritable destVertexId = new PositionWritable();
private Iterator<PositionWritable> posIterator;
@@ -185,7 +185,7 @@
public void mergeChainVertex() {
//merge chain
lastKmer.setAsCopy(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getKmer()));
+ incomingMsg.getActualKmer()));
getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -222,7 +222,7 @@
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State.IS_HEAD)//is_tail
outgoingMsg.setFlag(Message.STOP);
destVertexId.setAsCopy(incomingMsg.getSourceVertexId());
@@ -251,8 +251,8 @@
}
public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+ PregelixJob job = new PregelixJob(P1ForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P1ForPathMergeVertex.class);
/**
* BinaryInput and BinaryOutput
*/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
similarity index 64%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index b6f2164..ef39a23 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -13,6 +13,8 @@
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
* vertexValue: VertexValueWritable
@@ -41,12 +43,13 @@
* The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
-public class LogAlgorithmForPathMergeVertex extends
- BasicPathMergeVertex {
+public class P2ForPathMergeVertex extends
+ MapReduceVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
KmerBytesWritable tmpKmer = new KmerBytesWritable();
-
+
+ private boolean isFakeVertex = false;
/**
* initiate kmerSize, maxIteration
*/
@@ -64,6 +67,19 @@
else
outgoingMsg.reset(kmerSize);
receivedMsgList.clear();
+ if(reverseKmer == null)
+ reverseKmer = new VKmerBytesWritable();
+ if(kmerList == null)
+ kmerList = new KmerListWritable();
+ else
+ kmerList.reset();
+ if(fakeVertex == null){
+// fakeVertex = new KmerBytesWritable(kmerSize + 1);
+ fakeVertex = new VKmerBytesWritable();
+ String random = generaterRandomString(kmerSize + 1);
+ fakeVertex.setByRead(random.getBytes(), 0);
+ }
+ isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
}
/**
@@ -138,11 +154,19 @@
}
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(getMsgFlag() == MessageFlag.IS_FINAL){
+ /** final Vertex Responses To FakeVertex **/
+ if((byte)(incomingMsg.getFlag() & MessageFlag.KILL_MASK) == MessageFlag.KILL){
+ if((byte)(incomingMsg.getFlag() & MessageFlag.DIR_MASK) == MessageFlag.DIR_FROM_DEADVERTEX){
+ responseToDeadVertex();
+ } else{
+ broadcaseKillself();
+ }
+ }else if(getMsgFlag() == MessageFlag.IS_FINAL){
processMerge(incomingMsg);
getVertexValue().setState(State.IS_FINAL);
}else{
sendUpdateMsg();
+ outFlag = 0;
sendMergeMsg();
}
}
@@ -155,14 +179,24 @@
//process merge when receiving msg
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(getMsgFlag() == MessageFlag.IS_FINAL){
- sendFinalMergeMsg();
- break;
+ /** final Vertex Responses To FakeVertex **/
+ if((byte)(incomingMsg.getFlag() & MessageFlag.KILL_MASK) == MessageFlag.KILL){
+ if((byte)(incomingMsg.getFlag() & MessageFlag.DIR_MASK) == MessageFlag.DIR_FROM_DEADVERTEX){
+ responseToDeadVertex();
+ } else{
+ broadcaseKillself();
+ }
+ } else {
+ /** for final processing **/
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ sendFinalMergeMsg();
+ break;
+ }
+ if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
+ processUpdate();
+ else if(!incomingMsg.isUpdateMsg())
+ receivedMsgList.add(incomingMsg);
}
- if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
- processUpdate();
- else if(!incomingMsg.isUpdateMsg())
- receivedMsgList.add(incomingMsg);
}
if(receivedMsgList.size() != 0){
byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
@@ -172,12 +206,14 @@
for(int i = 0; i < 2; i++)
processFinalMerge(receivedMsgList.get(i)); //processMerge()
getVertexValue().setState(State.IS_FINAL);
+ /** NON-FAKE and Final vertice send msg to FAKE vertex **/
+ sendMsgToFakeVertex();
voteToHalt();
break;
case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
for(int i = 0; i < 2; i++)
processFinalMerge(receivedMsgList.get(i));
- getVertexValue().setState(State .IS_HEAD);
+ setHeadState();
break;
case MessageFromHead.BothMsgsFromNonHead:
for(int i = 0; i < 2; i++)
@@ -193,36 +229,70 @@
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
- if (getSuperstep() == 1)
+ if (getSuperstep() == 1){
+ addFakeVertex();
startSendMsg();
- else if (getSuperstep() == 2)
+ }
+ else if (getSuperstep() == 2){
+ if(!msgIterator.hasNext() && isFakeVertex)
+ voteToHalt();
initState(msgIterator);
+ }
else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
- if(msgIterator.hasNext()){ //for processing final merge
- incomingMsg = msgIterator.next();
- if(getMsgFlag() == MessageFlag.IS_FINAL){
- setFinalState();
- processFinalMerge(incomingMsg);
+ if(!isFakeVertex){
+ /** for processing final merge **/
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ setFinalState();
+ processFinalMerge(incomingMsg);
+ /** NON-FAKE and Final vertice send msg to FAKE vertex **/
+ sendMsgToFakeVertex();
+ } else if(isKillMsg()){
+ responseToDeadVertex();
+ }
+ }
+ /** processing general case **/
+ else{
+ sendMsgToPathVertex(msgIterator);
+ if(selfFlag != State.IS_HEAD)
+ voteToHalt();
}
}
+ /** Fake vertex agregates message and group them by actual kmer **/
else{
- sendMsgToPathVertex(msgIterator);
- if(selfFlag != State.IS_HEAD)
- voteToHalt();
+ kmerMapper.clear();
+ /** Mapper **/
+ mapKeyByActualKmer(msgIterator);
+ /** Reducer **/
+ reduceKeyByActualKmer();
+ voteToHalt();
}
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
- responseMsgToHeadVertex(msgIterator);
- if(selfFlag != State.IS_HEAD)
+ if(!isFakeVertex){
+ responseMsgToHeadVertex(msgIterator);
+ if(selfFlag != State.IS_HEAD)
+ voteToHalt();
+ }
+ /** Fake vertex agregates message and group them by actual kmer **/
+ else{
+ kmerMapper.clear();
+ /** Mapper **/
+ mapKeyByActualKmer(msgIterator);
+ /** Reducer **/
+ reduceKeyByActualKmer();
voteToHalt();
+ }
} else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
- processMergeInHeadVertex(msgIterator);
+ if(!isFakeVertex)
+ processMergeInHeadVertex(msgIterator);
}else
voteToHalt();
}
public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ PregelixJob job = new PregelixJob(P2ForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P2ForPathMergeVertex.class);
/**
* BinaryInput and BinaryOutput~/
*/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index fd3a1db..cf35c7a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -3,7 +3,7 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -64,7 +64,7 @@
private MessageWritable outgoingMsg = new MessageWritable();
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
private PositionWritable destVertexId = new PositionWritable();
private Iterator<PositionWritable> posIterator;
@@ -231,7 +231,7 @@
*/
public void mergeChainVertex(){
lastKmer.setAsCopy(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getKmer()));
+ incomingMsg.getActualKmer()));
getVertexValue().setKmer(
kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
lastKmer));
@@ -271,7 +271,7 @@
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State2.END_VERTEX)
outgoingMsg.setFlag(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -325,7 +325,7 @@
else {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State2.PSEUDOREAR)
outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State2.END_VERTEX)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 7a22d25..ecfafa7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -48,7 +48,7 @@
* Naive Algorithm for path merge graph
*/
public class P4ForPathMergeVertex extends
- BasicPathMergeVertex {
+ BasicGraphCleanVertex {
public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
@@ -93,7 +93,6 @@
inFlag = (byte)0;
// Node may be marked as head b/c it's a real head or a real tail
headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
- outgoingMsg.reset();
}
protected boolean isNodeRandomHead(KmerBytesWritable nodeKmer) {
@@ -211,7 +210,7 @@
//send message to the merge object and kill self
broadcastMergeMsg();
} else if (getSuperstep() % 4 == 2){
- //merge kmer
+ //merge tmpKmer
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
selfFlag = (byte) (State.VERTEX_MASK & getVertexValue().getState());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 7f8a832..3f91ac1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -113,7 +113,7 @@
*/
protected boolean setNextInfo(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0) {
- nextID.setAsCopy(value.getFFList().getPosition(0));
+ nextID.set(value.getFFList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
return true;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
new file mode 100644
index 0000000..8546bb6
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -0,0 +1,329 @@
+package edu.uci.ics.genomix.pregelix.operator.splitrepeat;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+public class SplitRepeatVertex extends
+ BasicGraphCleanVertex{
+
+ public class CreatedVertex{
+ VKmerBytesWritable createdVertexId;
+ String incomingDir;
+ String outgoingDir;
+ VKmerBytesWritable incomingEdge;
+ VKmerBytesWritable outgoingEdge;
+
+ public CreatedVertex(){
+ createdVertexId = new VKmerBytesWritable(kmerSize);
+ incomingDir = "";
+ outgoingDir = "";
+ incomingEdge = new VKmerBytesWritable(kmerSize);
+ outgoingEdge = new VKmerBytesWritable(kmerSize);
+ }
+
+ public void clear(){
+ createdVertexId.reset(kmerSize);
+ incomingDir = "";
+ outgoingDir = "";
+ incomingEdge.reset(kmerSize);
+ outgoingEdge.reset(kmerSize);
+ }
+
+ public VKmerBytesWritable getCreatedVertexId() {
+ return createdVertexId;
+ }
+
+ public void setCreatedVertexId(KmerBytesWritable createdVertexId) {
+ this.createdVertexId = createdVertexId;
+ }
+
+ public String getIncomingDir() {
+ return incomingDir;
+ }
+
+ public void setIncomingDir(String incomingDir) {
+ this.incomingDir = incomingDir;
+ }
+
+ public String getOutgoingDir() {
+ return outgoingDir;
+ }
+
+ public void setOutgoingDir(String outgoingDir) {
+ this.outgoingDir = outgoingDir;
+ }
+
+ public VKmerBytesWritable getIncomingEdge() {
+ return incomingEdge;
+ }
+
+ public void setIncomingEdge(KmerBytesWritable incomingEdge) {
+ this.incomingEdge.set(incomingEdge);
+ }
+
+ public VKmerBytesWritable getOutgoingEdge() {
+ return outgoingEdge;
+ }
+
+ public void setOutgoingEdge(KmerBytesWritable outgoingEdge) {
+ this.outgoingEdge.set(outgoingEdge);
+ }
+ }
+
+ private String[][] connectedTable = new String[][]{
+ {"FF", "RF"},
+ {"FF", "RR"},
+ {"FR", "RF"},
+ {"FR", "RR"}
+ };
+ public static Set<String> existKmerString = new HashSet<String>();
+ private Set<Long> readIdSet;
+ private Set<Long> incomingReadIdSet = new HashSet<Long>();
+ private Set<Long> outgoingReadIdSet = new HashSet<Long>();
+ private Set<Long> selfReadIdSet = new HashSet<Long>();
+ private Set<Long> incomingEdgeIntersection = new HashSet<Long>();
+ private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
+ private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
+ private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
+ private VKmerListWritable incomingEdgeList = null;
+ private VKmerListWritable outgoingEdgeList = null;
+ private byte incomingEdgeDir = 0;
+ private byte outgoingEdgeDir = 0;
+
+ protected KmerBytesWritable createdVertexId = null;
+ private CreatedVertex createdVertex = new CreatedVertex();
+ public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(incomingEdgeList == null)
+ incomingEdgeList = new VKmerListWritable(kmerSize);
+ if(outgoingEdgeList == null)
+ outgoingEdgeList = new VKmerListWritable(kmerSize);
+ if(createdVertexId == null)
+ createdVertexId = new VKmerBytesWritable(kmerSize + 1);
+ }
+
+ /**
+ * Generate random string from [ACGT]
+ */
+ public String generaterRandomString(int n){
+ char[] chars = "ACGT".toCharArray();
+ StringBuilder sb = new StringBuilder();
+ Random random = new Random();
+ while(true){
+ for (int i = 0; i < n; i++) {
+ char c = chars[random.nextInt(chars.length)];
+ sb.append(c);
+ }
+ if(!existKmerString.contains(sb.toString()))
+ break;
+ }
+ existKmerString.add(sb.toString());
+ return sb.toString();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if(getSuperstep() == 1){
+ if(getVertexValue().getDegree() > 2){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsgToAllNeighborNodes(getVertexValue());
+ }
+ voteToHalt();
+ } else if(getSuperstep() == 2){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ outgoingMsg.setNodeIdList(getVertexValue().getNodeIdList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ }
+ voteToHalt();
+ } else if(getSuperstep() == 3){
+ kmerMap.clear();
+ createdVertexSet.clear();
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ readIdSet = new HashSet<Long>();
+ for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
+ readIdSet.add(nodeId.getReadId());
+ }
+ kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
+ }
+ /** process connectedTable **/
+ for(int i = 0; i < 4; i++){
+ switch(connectedTable[i][0]){
+ case "FF":
+ outgoingEdgeList.set(getVertexValue().getFFList());
+ outgoingEdgeDir = MessageFlag.DIR_FF;
+ break;
+ case "FR":
+ outgoingEdgeList.set(getVertexValue().getFRList());
+ outgoingEdgeDir = MessageFlag.DIR_FR;
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case "RF":
+ incomingEdgeList.set(getVertexValue().getRFList());
+ incomingEdgeDir = MessageFlag.DIR_RF;
+ break;
+ case "RR":
+ incomingEdgeList.set(getVertexValue().getRRList());
+ incomingEdgeDir = MessageFlag.DIR_RR;
+ break;
+ }
+ selfReadIdSet.clear();
+ for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
+ selfReadIdSet.add(nodeId.getReadId());
+ }
+ for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
+ for(KmerBytesWritable incomingEdge : incomingEdgeList){
+ outgoingReadIdSet.clear();
+ incomingReadIdSet.clear();
+ outgoingReadIdSet.addAll(kmerMap.get(outgoingEdge));
+ incomingReadIdSet.addAll(kmerMap.get(incomingEdge));
+
+ //set all neighberEdge readId intersection
+ neighborEdgeIntersection.addAll(selfReadIdSet);
+ neighborEdgeIntersection.retainAll(outgoingReadIdSet);
+ neighborEdgeIntersection.retainAll(incomingReadIdSet);
+ //set outgoingEdge readId intersection
+ outgoingEdgeIntersection.addAll(selfReadIdSet);
+ outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
+ outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
+ //set incomingEdge readId intersection
+ incomingEdgeIntersection.addAll(selfReadIdSet);
+ incomingEdgeIntersection.retainAll(incomingReadIdSet);
+ incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
+
+ if(!neighborEdgeIntersection.isEmpty()){
+ createdVertex.clear();
+ createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+ createdVertex.setCreatedVertexId(createdVertexId);
+ createdVertex.setIncomingDir(connectedTable[i][1]);
+ createdVertex.setOutgoingDir(connectedTable[i][0]);
+ createdVertex.setIncomingEdge(incomingEdge);
+ createdVertex.setOutgoingEdge(outgoingEdge);
+ createdVertexSet.add(createdVertex);
+
+ outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setFlag(incomingEdgeDir);
+ sendMsg(incomingEdge, outgoingMsg);
+ outgoingMsg.setFlag(outgoingEdgeDir);
+ sendMsg(outgoingEdge, outgoingMsg);
+ }
+
+ if(!incomingEdgeIntersection.isEmpty()){
+ createdVertex.clear();
+ createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+ createdVertex.setCreatedVertexId(createdVertexId);
+ createdVertex.setIncomingDir(connectedTable[i][1]);
+ createdVertex.setIncomingEdge(incomingEdge);
+ createdVertexSet.add(createdVertex);
+
+ outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setFlag(incomingEdgeDir);
+ sendMsg(incomingEdge, outgoingMsg);
+ }
+
+ if(!outgoingEdgeIntersection.isEmpty()){
+ createdVertex.clear();
+ createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+ createdVertex.setCreatedVertexId(createdVertexId);
+ createdVertex.setOutgoingDir(connectedTable[i][0]);
+ createdVertex.setOutgoingEdge(outgoingEdge);
+ createdVertexSet.add(createdVertex);
+
+ outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setFlag(outgoingEdgeDir);
+ sendMsg(outgoingEdge, outgoingMsg);
+ }
+ }
+ }
+ }
+ } else if(getSuperstep() == 4){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ /** update edgelist to new/created vertex **/
+ byte meToNeighborDir = incomingMsg.getFlag();
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_FR:
+ getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RF:
+ getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RR:
+ getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ }
+ /** add new/created vertex **/
+ for(CreatedVertex v : createdVertexSet){
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ VertexValueWritable vertexValue = new VertexValueWritable();
+ switch(v.incomingDir){
+ case "RF":
+ vertexValue.getRFList().append(v.incomingEdge);
+ break;
+ case "RR":
+ vertexValue.getRRList().append(v.incomingEdge);
+ break;
+ }
+ switch(v.outgoingDir){
+ case "FF":
+ vertexValue.getFFList().append(v.outgoingEdge);
+ break;
+ case "FR":
+ vertexValue.getFRList().append(v.outgoingEdge);
+ break;
+ }
+ vertex.setVertexId(v.getCreatedVertexId());
+ vertex.setVertexValue(vertexValue);
+ }
+ createdVertexSet.clear();
+ }
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 9b83d19..b4f2407 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -9,7 +9,7 @@
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -45,7 +45,7 @@
* Remove tip or single node when l > constant
*/
public class TipRemoveVertex extends
- BasicPathMergeVertex {
+ BasicGraphCleanVertex {
public static final String LENGTH = "TipRemoveVertex.length";
private int length = -1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
index fa5ae19..13d9d98 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -11,8 +11,8 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
@@ -33,8 +33,8 @@
VertexValueWritable outputValue = new VertexValueWritable();
while(reader.next(node, value)) {
- System.out.println(node.getNodeID().toString());
- outputKey.set(node.getNodeID());
+// System.out.println(node.getNodeID().toString());
+// outputKey.set(node.getNodeID());
outputValue.setFFList(node.getFFList());
outputValue.setFRList(node.getFRList());
outputValue.setRFList(node.getRFList());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 4aee8a0..73ab533 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -6,7 +6,7 @@
public static final byte CHAIN = 1 << 1;
public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
- public static final byte STATE = 1 << 4;
+ public static final byte NODEIDLIST = 1 << 4;
public static final byte ADJMSG = 1 << 5;
public static final byte START = 1 << 6;
@@ -27,8 +27,8 @@
case MESSAGE:
r = "MESSAGE";
break;
- case STATE:
- r = "STATE";
+ case NODEIDLIST:
+ r = "READID";
break;
case ADJMSG:
r = "ADJMSG";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index bee1dd8..2ed36e9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -10,4 +10,6 @@
public static final byte DIR_RR = 0b100 << 0;
public static final byte DIR_MASK = 0b111 << 0;
public static final byte DIR_CLEAR = 0b1111000 << 0;
+
+ public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index e18c31b..630b5aa 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -13,10 +13,12 @@
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.splitrepeat.SplitRepeatVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -28,13 +30,13 @@
// private static void generateNaiveAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+// job.setVertexClass(P1ForPathMergeVertex.class);
// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); //GraphCleanInputFormat
// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
// job.setDynamicVertexValueSize(true);
// job.setOutputKeyClass(PositionWritable.class);
// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().setInt(P1ForPathMergeVertex.KMER_SIZE, 3);
// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
// }
//
@@ -45,13 +47,13 @@
private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ job.setVertexClass(P2ForPathMergeVertex.class);
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().setInt(P2ForPathMergeVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -95,6 +97,37 @@
// + "P4ForMergeGraph.xml");
// }
+ private static void generateMapReduceGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MapReduceVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(MapReduceVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genMapReduceGraph() throws IOException {
+ generateMapReduceGraphJob("MapReduceGraph", outputBase + "MapReduceGraph.xml");
+ }
+
+ private static void generateSplitRepeatGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(SplitRepeatVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(SplitRepeatVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genSplitRepeatGraph() throws IOException {
+ generateSplitRepeatGraphJob("SplitRepeatGraph", outputBase + "SplitRepeatGraph.xml");
+ }
// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
// job.setVertexClass(TipAddVertex.class);
@@ -199,7 +232,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- genLogAlgorithmForMergeGraph();
+// genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
// genTipRemoveGraph();
@@ -208,6 +241,8 @@
// genBubbleAddGraph();
// genBubbleMergeGraph();
// genP4ForMergeGraph();
+// genMapReduceGraph();
+ genSplitRepeatGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 1578dfc..115f090 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -45,13 +45,13 @@
public static final String PreFix = "data/PathMergeTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
-// + "2", PreFix + File.separator
-// + "3", PreFix + File.separator
-// + "4", PreFix + File.separator
-// + "5", PreFix + File.separator
-// + "6", PreFix + File.separator
-// + "7", PreFix + File.separator
-// + "8", PreFix + File.separator
+ + "2", PreFix + File.separator
+ + "3", PreFix + File.separator
+ + "4", PreFix + File.separator
+ + "5", PreFix + File.separator
+ + "6", PreFix + File.separator
+ + "7", PreFix + File.separator
+ + "8", PreFix + File.separator
+ "9"};
private static final String ACTUAL_RESULT_DIR = "data/actual/pathmerge";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
new file mode 100644
index 0000000..21ffe34
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class SplitRepeatSmallTestSuite extends TestSuite {
+ private static final Logger LOGGER = Logger.getLogger(SplitRepeatSmallTestSuite.class.getName());
+ //P4ForMergeGraph/bin/read
+ public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
+ public static final String[] TestDir = { PreFix + File.separator
+ + "SimpleTest"};
+ private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only_splitrepeat.txt";
+
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles()
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ SplitRepeatSmallTestSuite testSuite = new SplitRepeatSmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
+
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "bin" + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "txt" + File.separator + testDir.getName();
+ testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(), dfs,
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
+
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
+
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/resources/only_splitrepeat.txt b/genomix/genomix-pregelix/src/test/resources/only_splitrepeat.txt
new file mode 100644
index 0000000..41cba34
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/only_splitrepeat.txt
@@ -0,0 +1 @@
+SplitRepeatGraph.xml
\ No newline at end of file
diff --git a/patch.diff b/patch.diff
new file mode 100644
index 0000000..a333970
--- /dev/null
+++ b/patch.diff
@@ -0,0 +1,245 @@
+From 9e006501f9e33467a8428199bd94b71dbff063ef Mon Sep 17 00:00:00 2001
+From: Anbang Xu <anbangx@gmail.com>
+Date: Fri, 26 Jul 2013 14:10:33 -0700
+Subject: [PATCH] p2 pass all the tests except 9
+
+---
+ .../genomix/data/test/KmerBytesWritableTest.java | 76 +++++++++++++++++++++-
+ .../genomix/pregelix/io/VertexValueWritable.java | 2 +-
+ .../operator/pathmerge/BasicPathMergeVertex.java | 35 +++++-----
+ .../pathmerge/LogAlgorithmForPathMergeVertex.java | 8 +--
+ .../pregelix/JobRun/PathMergeSmallTestSuite.java | 2 +-
+ 5 files changed, 98 insertions(+), 25 deletions(-)
+
+diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+index bda73e5..fbfbeeb 100644
+--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
++++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+@@ -229,14 +229,34 @@ public class KmerBytesWritableTest {
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAACAACCC", merge.toString());
+
+- String test1 = "CTA";
+- String test2 = "AGA";
++ String test1;
++ String test2;
++ test1 = "CTA";
++ test2 = "AGA";
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("TCTA", k1.toString());
++
++ test1 = "CTA";
++ test2 = "ATA"; //TAT
++ k1 = new KmerBytesWritable(3);
++ k2 = new KmerBytesWritable(3);
++ k1.setByRead(test1.getBytes(), 0);
++ k2.setByRead(test2.getBytes(), 0);
++ k1.mergeWithFRKmer(3, k2);
++ Assert.assertEquals("CTAT", k1.toString());
++
++ test1 = "ATA";
++ test2 = "CTA"; //TAT
++ k1 = new KmerBytesWritable(3);
++ k2 = new KmerBytesWritable(3);
++ k1.setByRead(test1.getBytes(), 0);
++ k2.setByRead(test2.getBytes(), 0);
++ k1.mergeWithFRKmer(3, k2);
++ Assert.assertEquals("ATAG", k1.toString());
+ }
+
+
+@@ -281,5 +301,55 @@ public class KmerBytesWritableTest {
+ }
+ }
+ }
+-
++
++ @Test
++ public void TestFinalMerge() {
++ String selfString;
++ String match;
++ String msgString;
++ int index;
++ KmerBytesWritable kmer = new KmerBytesWritable();
++ int kmerSize = 3;
++
++ String F1 = "AATAG";
++ String F2 = "TAGAA";
++ String R1 = "CTATT";
++ String R2 = "TTCTA";
++
++ //FF test
++ selfString = F1;
++ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
++ msgString = F2;
++ index = msgString.indexOf(match);
++ kmer.reset(msgString.length() - index);
++ kmer.setByRead(msgString.substring(index).getBytes(), 0);
++ System.out.println(kmer.toString());
++
++ //FR test
++ selfString = F1;
++ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
++ msgString = GeneCode.reverseComplement(R2);
++ index = msgString.indexOf(match);
++ kmer.reset(msgString.length() - index);
++ kmer.setByRead(msgString.substring(index).getBytes(), 0);
++ System.out.println(kmer.toString());
++
++ //RF test
++ selfString = R1;
++ match = selfString.substring(0,kmerSize - 1);
++ msgString = GeneCode.reverseComplement(F2);
++ index = msgString.lastIndexOf(match) + kmerSize - 2;
++ kmer.reset(index + 1);
++ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
++ System.out.println(kmer.toString());
++
++ //RR test
++ selfString = R1;
++ match = selfString.substring(0,kmerSize - 1);
++ msgString = R2;
++ index = msgString.lastIndexOf(match) + kmerSize - 2;
++ kmer.reset(index + 1);
++ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
++ System.out.println(kmer.toString());
++ }
+ }
+diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+index 6d4f683..065bfd5 100644
+--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
++++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+@@ -32,7 +32,7 @@ public class VertexValueWritable implements WritableComparable<VertexValueWritab
+ public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
+ public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
+ public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
+- public static final byte SHOULD_MERGE_CLEAR = 0b1110011;
++ public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
+ }
+
+ private PositionListWritable nodeIdList;
+diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+index b7b0814..ec608c5 100644
+--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
++++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+@@ -495,6 +495,7 @@ public class BasicPathMergeVertex extends
+
+ public void setStateAsMergeWithNext(){
+ byte state = getVertexValue().getState();
++ state &= State.SHOULD_MERGE_CLEAR;
+ state |= State.SHOULD_MERGEWITHNEXT;
+ getVertexValue().setState(state);
+ }
+@@ -512,6 +513,7 @@ public class BasicPathMergeVertex extends
+
+ public void setStateAsMergeWithPrev(){
+ byte state = getVertexValue().getState();
++ state &= State.SHOULD_MERGE_CLEAR;
+ state |= State.SHOULD_MERGEWITHPREV;
+ getVertexValue().setState(state);
+ }
+@@ -638,7 +640,7 @@ public class BasicPathMergeVertex extends
+ String match;
+ String msgString;
+ int index;
+- switch(neighborToMergeDir){
++ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+@@ -648,28 +650,29 @@ public class BasicPathMergeVertex extends
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_FR:
+- selfString = getVertexId().toString();
+- match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
++ selfString = getVertexValue().getKmer().toString();
++ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+- kmer.setByRead(msgString.substring(index).getBytes(), 0);
++ kmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_RF:
+- selfString = GeneCode.reverseComplement(getVertexValue().getKmer().toString());
+- match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+- msgString = msg.getKmer().toString();
+- index = msgString.indexOf(match);
+- kmer.reset(msgString.length() - index);
+- kmer.setByRead(msgString.substring(index).getBytes(), 0);
++ selfString = getVertexValue().getKmer().toString();
++ match = selfString.substring(0,kmerSize - 1);
++ msgString = GeneCode.reverseComplement(msg.getKmer().toString());
++ index = msgString.lastIndexOf(match) + kmerSize - 2;
++ kmer.reset(index + 1);
++ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_RR:
+- selfString = GeneCode.reverseComplement(getVertexValue().getKmer().toString());
+- match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+- msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+- index = msgString.indexOf(match);
+- kmer.reset(msgString.length() - index);
+- kmer.setByRead(msgString.substring(index).getBytes(), 0);
++ selfString = getVertexValue().getKmer().toString();
++ match = selfString.substring(0,kmerSize - 1);
++ msgString = msg.getKmer().toString();
++ index = msgString.lastIndexOf(match) + kmerSize - 2;
++ kmer.reset(index + 1);
++ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
++ System.out.println(kmer.toString());
+ break;
+ }
+
+diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+index a68b646..3b5a782 100644
+--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
++++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+@@ -170,22 +170,22 @@ public class LogAlgorithmForPathMergeVertex extends
+ case MessageFromHead.BothMsgsFromHead:
+ case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
+ for(int i = 0; i < 2; i++)
+- processMerge(receivedMsgList.get(i));
++ processFinalMerge(receivedMsgList.get(i)); //processMerge()
+ getVertexValue().setState(State.IS_FINAL);
+ voteToHalt();
+ break;
+ case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
+ for(int i = 0; i < 2; i++)
+- processMerge(receivedMsgList.get(i));
++ processFinalMerge(receivedMsgList.get(i));
+ getVertexValue().setState(State .IS_HEAD);
+ break;
+ case MessageFromHead.BothMsgsFromNonHead:
+ for(int i = 0; i < 2; i++)
+- processMerge(receivedMsgList.get(i));
++ processFinalMerge(receivedMsgList.get(i));
+ break;
+ case MessageFromHead.NO_MSG:
+ //halt
+- deleteVertex(getVertexId());
++ voteToHalt(); //deleteVertex(getVertexId());
+ break;
+ }
+ }
+diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+index 9f96b5a..1578dfc 100644
+--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
++++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+@@ -52,7 +52,7 @@ public class PathMergeSmallTestSuite extends TestSuite {
+ // + "6", PreFix + File.separator
+ // + "7", PreFix + File.separator
+ // + "8", PreFix + File.separator
+- + "5"};
++ + "9"};
+ private static final String ACTUAL_RESULT_DIR = "data/actual/pathmerge";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+--
+1.7.11.1
+