adapt graph construction(hadoop) to VKmer
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 35e55d5..860325f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -8,7 +8,6 @@
import java.io.Serializable;
import java.util.Comparator;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.data.Marshal;
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
index 1b60c3c..4a52558 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
@@ -62,7 +62,7 @@
@Test
public void TestCompressKmerReverse() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(k);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
kmer.setByRead(k, array, 0);
Assert.assertEquals(kmer.toString(), "AATAGAA");
@@ -72,7 +72,7 @@
@Test
public void TestMoveKmerReverse() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(k);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
kmer.setByRead(k, array, 0);
Assert.assertEquals(kmer.toString(), "AATAGAA");
@@ -88,7 +88,7 @@
@Test
public void TestGetGene() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(9);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
String text = "AGCTGACCG";
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G' };
kmer.setByRead(9, array, 0);
@@ -103,8 +103,8 @@
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String string = "AGCTGACCGT";
for (int k = 3; k <= 10; k++) {
- VKmerBytesWritable kmer = new VKmerBytesWritable(k);
- VKmerBytesWritable kmerAppend = new VKmerBytesWritable(k);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ VKmerBytesWritable kmerAppend = new VKmerBytesWritable();
kmer.setByRead(k, array, 0);
Assert.assertEquals(string.substring(0, k), kmer.toString());
for (int b = 0; b < k; b++) {
@@ -126,12 +126,12 @@
public void TestMergeFFKmer() {
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String text = "AGCTGACCGT";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(8);
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
kmer1.setByRead(8, array, 0);
String text1 = "AGCTGACC";
Assert.assertEquals(text1, kmer1.toString());
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(8);
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
kmer2.setByRead(8, array, 1);
String text2 = "GCTGACCG";
Assert.assertEquals(text2, kmer2.toString());
@@ -173,17 +173,17 @@
byte[] resultArray = result.getBytes();
String text1 = "AAGCTAA";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(text1.length());
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
kmer1.setByRead(text1.length(), resultArray, 0);
Assert.assertEquals(text1, kmer1.toString());
// kmer2 is the rc of the end of the read
String text2 = "GGTTGTT";
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(text2.length());
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
kmer2.setByReadReverse(text2.length(), resultArray, result.length() - text2.length());
Assert.assertEquals(text2, kmer2.toString());
- VKmerBytesWritable merge = new VKmerBytesWritable(kmer1);
+ VKmerBytesWritable merge = new VKmerBytesWritable();
merge.mergeWithFRKmer(kmerSize, kmer2);
Assert.assertEquals(result, merge.toString());
@@ -210,17 +210,17 @@
byte[] resultArray = result.getBytes();
String text1 = "AACAACCC";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(text1.length());
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
kmer1.setByRead(text1.length(), resultArray, 5);
Assert.assertEquals(text1, kmer1.toString());
// kmer2 is the rc of the end of the read
String text2 = "TTGTGCC";
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(text2.length());
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
kmer2.setByReadReverse(text2.length(), resultArray, 0);
Assert.assertEquals(text2, kmer2.toString());
- VKmerBytesWritable merge = new VKmerBytesWritable(kmer1);
+ VKmerBytesWritable merge = new VKmerBytesWritable();
merge.mergeWithRFKmer(kmerSize, kmer2);
Assert.assertEquals(result, merge.toString());
@@ -251,8 +251,8 @@
String test3 = "CTA";
String test4 = "AGA"; // rc = TCT
- VKmerBytesWritable k3 = new VKmerBytesWritable(3);
- VKmerBytesWritable k4 = new VKmerBytesWritable(3);
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ VKmerBytesWritable k4 = new VKmerBytesWritable();
k3.setByRead(3, test3.getBytes(), 0);
k4.setByRead(3, test4.getBytes(), 0);
k3.mergeWithRFKmer(3, k4);
@@ -264,8 +264,8 @@
String test2;
test1 = "CTA";
test2 = "AGA";
- VKmerBytesWritable k1 = new VKmerBytesWritable(3);
- VKmerBytesWritable k2 = new VKmerBytesWritable(3);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
k1.setByRead(3, test1.getBytes(), 0);
k2.setByRead(3, test2.getBytes(), 0);
k1.mergeWithRFKmer(3, k2);
@@ -275,8 +275,8 @@
test1 = "CTA";
test2 = "ATA"; //TAT
- k1 = new VKmerBytesWritable(3);
- k2 = new VKmerBytesWritable(3);
+ k1 = new VKmerBytesWritable();
+ k2 = new VKmerBytesWritable();
k1.setByRead(3, test1.getBytes(), 0);
k2.setByRead(3, test2.getBytes(), 0);
k1.mergeWithFRKmer(3, k2);
@@ -284,8 +284,8 @@
test1 = "ATA";
test2 = "CTA"; //TAT
- k1 = new VKmerBytesWritable(3);
- k2 = new VKmerBytesWritable(3);
+ k1 = new VKmerBytesWritable();
+ k2 = new VKmerBytesWritable();
k1.setByRead(3, test1.getBytes(), 0);
k2.setByRead(3, test2.getBytes(), 0);
k1.mergeWithFRKmer(3, k2);
@@ -293,8 +293,8 @@
test1 = "TCTAT";
test2 = "GAAC";
- k1 = new VKmerBytesWritable(5);
- k2 = new VKmerBytesWritable(4);
+ k1 = new VKmerBytesWritable();
+ k2 = new VKmerBytesWritable();
k1.setByRead(5, test1.getBytes(), 0);
k2.setByRead(4, test2.getBytes(), 0);
k1.mergeWithRFKmer(3, k2);
@@ -305,10 +305,10 @@
public void TestMergeRRKmer() {
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String text = "AGCTGACCGT";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(8);
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
kmer1.setByRead(8, array, 0);
String text1 = "AGCTGACC";
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(8);
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
kmer2.setByRead(8, array, 1);
String text2 = "GCTGACCG";
Assert.assertEquals(text2, kmer2.toString());
@@ -325,8 +325,8 @@
for (int ik = 1; ik <= 10; ik++) {
for (int jk = 1; jk <= 10; jk++) {
- kmer1 = new VKmerBytesWritable(ik);
- kmer2 = new VKmerBytesWritable(jk);
+ kmer1 = new VKmerBytesWritable();
+ kmer2 = new VKmerBytesWritable();
kmer1.setByRead(ik, array, 0);
kmer2.setByRead(jk, array, 0);
text1 = text.substring(0, ik);
@@ -347,9 +347,9 @@
String test1 = "TAGAT";
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "GCTAG";
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
k1.setByRead(5, test1.getBytes(), 0);
k2.setByRead(5, test2.getBytes(), 0);
k3.setByRead(5, test3.getBytes(), 0);
@@ -364,9 +364,9 @@
String test1 = "TAGAT";
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
k1.setByRead(5, test1.getBytes(), 0);
k2.setByRead(5, test2.getBytes(), 0);
k3.setByRead(5, test3.getBytes(), 0);
@@ -381,9 +381,9 @@
String test1 = "TAGAT"; // rc = ATCTA
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "GCTAG"; // rc = CTAGC
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
k1.setByRead(5, test1.getBytes(), 0);
k2.setByRead(5, test2.getBytes(), 0);
k3.setByRead(5, test3.getBytes(), 0);
@@ -398,9 +398,9 @@
String test1 = "TAGAT"; // rc = ATCTA
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
k1.setByRead(5, test1.getBytes(), 0);
k2.setByRead(5, test2.getBytes(), 0);
k3.setByRead(5, test3.getBytes(), 0);
@@ -415,9 +415,9 @@
String test1 = "TAGAT"; // rc = ATCTA
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
k1.setByRead(5, test1.getBytes(), 0);
k2.setByRead(5, test2.getBytes(), 0);
k3.setByRead(5, test3.getBytes(), 0);
@@ -432,9 +432,9 @@
String test1 = "TAGAT";
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
k1.setByRead(5, test1.getBytes(), 0);
k2.setByRead(5, test2.getBytes(), 0);
k3.setByRead(5, test3.getBytes(), 0);
@@ -519,22 +519,22 @@
System.out.println(difference.toString());
Map<VKmerBytesWritable, Set<Long>> map = new HashMap<VKmerBytesWritable, Set<Long>>();
- VKmerBytesWritable k1 = new VKmerBytesWritable(3);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
Set<Long> set1 = new HashSet<Long>();
k1.setByRead(3, ("CTA").getBytes(), 0);
set1.add((long)1);
map.put(k1, set1);
- VKmerBytesWritable k2 = new VKmerBytesWritable(3);
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
k2.setByRead(3, ("GTA").getBytes(), 0);
Set<Long> set2 = new HashSet<Long>();
set2.add((long) 2);
map.put(k2, set2);
- VKmerBytesWritable k3 = new VKmerBytesWritable(3);
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
k3.setByRead(3, ("ATG").getBytes(), 0);
Set<Long> set3 = new HashSet<Long>();
set3.add((long) 2);
map.put(k3, set3);
- VKmerBytesWritable k4 = new VKmerBytesWritable(3);
+ VKmerBytesWritable k4 = new VKmerBytesWritable();
k4.setByRead(3, ("AAT").getBytes(), 0);
Set<Long> set4 = new HashSet<Long>();
set4.add((long) 1);
diff --git a/genomix/genomix-hadoop/data/webmap/8 b/genomix/genomix-hadoop/data/webmap/8
deleted file mode 100644
index 3959d4d..0000000
--- a/genomix/genomix-hadoop/data/webmap/8
+++ /dev/null
@@ -1 +0,0 @@
-1 AATAGAACTT
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
index 3723ed9..2553d16 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
@@ -14,8 +14,8 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
@SuppressWarnings("deprecation")
@@ -51,7 +51,7 @@
conf.setMapperClass(GenomixMapper.class);
conf.setReducerClass(GenomixReducer.class);
- conf.setMapOutputKeyClass(KmerBytesWritable.class);
+ conf.setMapOutputKeyClass(VKmerBytesWritable.class);
conf.setMapOutputValueClass(NodeWritable.class);
//InputFormat and OutputFormat for Reducer
@@ -62,7 +62,7 @@
conf.setOutputFormat(TextOutputFormat.class);
//Output Key/Value Class
- conf.setOutputKeyClass(KmerBytesWritable.class);
+ conf.setOutputKeyClass(VKmerBytesWritable.class);
conf.setOutputValueClass(NodeWritable.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
index 0c7cc20..6951e8b 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
@@ -12,16 +12,15 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
@SuppressWarnings("deprecation")
public class GenomixMapper extends MapReduceBase implements
- Mapper<LongWritable, Text, KmerBytesWritable, NodeWritable>{
+ Mapper<LongWritable, Text, VKmerBytesWritable, NodeWritable>{
public static enum KmerDir{
FORWARD,
@@ -29,12 +28,12 @@
}
public static int KMER_SIZE;
- private KmerBytesWritable preForwardKmer;
- private KmerBytesWritable preReverseKmer;
- private KmerBytesWritable curForwardKmer;
- private KmerBytesWritable curReverseKmer;
- private KmerBytesWritable nextForwardKmer;
- private KmerBytesWritable nextReverseKmer;
+ private VKmerBytesWritable preForwardKmer;
+ private VKmerBytesWritable preReverseKmer;
+ private VKmerBytesWritable curForwardKmer;
+ private VKmerBytesWritable curReverseKmer;
+ private VKmerBytesWritable nextForwardKmer;
+ private VKmerBytesWritable nextReverseKmer;
private PositionWritable nodeId;
private PositionListWritable nodeIdList;
private VKmerListWritable edgeListForPreKmer;
@@ -50,13 +49,12 @@
@Override
public void configure(JobConf job) {
KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
- KmerBytesWritable.setGlobalKmerLength(KMER_SIZE);
- preForwardKmer = new KmerBytesWritable();
- preReverseKmer = new KmerBytesWritable();
- curForwardKmer = new KmerBytesWritable();
- curReverseKmer = new KmerBytesWritable();
- nextForwardKmer = new KmerBytesWritable();
- nextReverseKmer = new KmerBytesWritable();
+ preForwardKmer = new VKmerBytesWritable();
+ preReverseKmer = new VKmerBytesWritable();
+ curForwardKmer = new VKmerBytesWritable();
+ curReverseKmer = new VKmerBytesWritable();
+ nextForwardKmer = new VKmerBytesWritable();
+ nextReverseKmer = new VKmerBytesWritable();
nodeId = new PositionWritable();
nodeIdList = new PositionListWritable();
edgeListForPreKmer = new VKmerListWritable();
@@ -68,9 +66,8 @@
}
@Override
- public void map(LongWritable key, Text value, OutputCollector<KmerBytesWritable, NodeWritable> output,
+ public void map(LongWritable key, Text value, OutputCollector<VKmerBytesWritable, NodeWritable> output,
Reporter reporter) throws IOException {
- /** first kmer */
String[] rawLine = value.toString().split("\\t"); // Read the Real Gene Line
if (rawLine.length != 2) {
throw new IOException("invalid data");
@@ -86,11 +83,10 @@
if (KMER_SIZE >= array.length) {
throw new IOException("short read");
}
-
/** first kmer **/
outputNode.reset();
- curForwardKmer.setByRead(array, 0);
- curReverseKmer.setByReadReverse(array, 0);
+ curForwardKmer.setByRead(KMER_SIZE, array, 0);
+ curReverseKmer.setByReadReverse(KMER_SIZE, array, 0);
curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
setNextKmer(array[KMER_SIZE]);
//set value.nodeId
@@ -205,7 +201,7 @@
public void setPreKmer(byte preChar){
preForwardKmer.setAsCopy(curForwardKmer);
preForwardKmer.shiftKmerWithPreChar(preChar);
- preReverseKmer.setByReadReverse(preForwardKmer.toString().getBytes(), preForwardKmer.getOffset());
+ preReverseKmer.setByReadReverse(KMER_SIZE, preForwardKmer.toString().getBytes(), preForwardKmer.getBlockOffset());
preKmerDir = preForwardKmer.compareTo(preReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
}
@@ -213,7 +209,7 @@
public void setNextKmer(byte nextChar){
nextForwardKmer.setAsCopy(curForwardKmer);
nextForwardKmer.shiftKmerWithNextChar(nextChar);
- nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
+ nextReverseKmer.setByReadReverse(KMER_SIZE, nextForwardKmer.toString().getBytes(), nextForwardKmer.getBlockOffset());
nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
}
@@ -231,7 +227,7 @@
curReverseKmer.setAsCopy(nextReverseKmer);
}
- public void setMapperOutput(OutputCollector<KmerBytesWritable, NodeWritable> output) throws IOException{
+ public void setMapperOutput(OutputCollector<VKmerBytesWritable, NodeWritable> output) throws IOException{
switch(curKmerDir){
case FORWARD:
output.collect(curForwardKmer, outputNode);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index a0eb7c8..6404f0d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -9,12 +9,12 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
@SuppressWarnings("deprecation")
public class GenomixReducer extends MapReduceBase implements
- Reducer<KmerBytesWritable, NodeWritable, KmerBytesWritable, NodeWritable>{
+ Reducer<VKmerBytesWritable, NodeWritable, VKmerBytesWritable, NodeWritable>{
public static int KMER_SIZE;
private NodeWritable outputNode;
@@ -23,24 +23,23 @@
@Override
public void configure(JobConf job) {
KMER_SIZE = GenomixMapper.KMER_SIZE;
- KmerBytesWritable.setGlobalKmerLength(KMER_SIZE);
outputNode = new NodeWritable();
tmpNode = new NodeWritable();
}
@Override
- public void reduce(KmerBytesWritable key, Iterator<NodeWritable> values,
- OutputCollector<KmerBytesWritable, NodeWritable> output,
+ public void reduce(VKmerBytesWritable key, Iterator<NodeWritable> values,
+ OutputCollector<VKmerBytesWritable, NodeWritable> output,
Reporter reporter) throws IOException {
outputNode.reset();
while (values.hasNext()) {
tmpNode.set(values.next());
outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
- outputNode.getFFList().appendList(tmpNode.getFFList()); //appendList need to check if insert node exists
- outputNode.getFRList().appendList(tmpNode.getFRList());
- outputNode.getRFList().appendList(tmpNode.getRFList());
- outputNode.getRRList().appendList(tmpNode.getRRList());
+ outputNode.getFFList().unionUpdate(tmpNode.getFFList()); //appendList need to check if insert node exists
+ outputNode.getFRList().unionUpdate(tmpNode.getFRList());
+ outputNode.getRFList().unionUpdate(tmpNode.getRFList());
+ outputNode.getRRList().unionUpdate(tmpNode.getRRList());
}
output.collect(key,outputNode);
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index cc7e0ac..498a87d 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,13 +22,13 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/RemoveBridge.txt";
+ private static final String DATA_PATH = "data/webmap/pathmerge_TestSet/5";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
// private static final int COUNT_REDUCER = 2;
private static final int SIZE_KMER = 3;
- private static final int READ_LENGTH = 5;
+ private static final int READ_LENGTH = 7;
private MiniDFSCluster dfsCluster;
private MiniMRCluster mrCluster;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/GenerateGraphViz.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/GenerateGraphViz.java
index d9d6642..b65a4a1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/GenerateGraphViz.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/GenerateGraphViz.java
@@ -84,7 +84,7 @@
public static void main(String[] args) throws Exception {
GenerateGraphViz g = new GenerateGraphViz();
- g.convertGraphCleanOutputToGraphViz("data/actual/tipadd/TipAddGraph/bin/5", "graphtest");
+ g.convertGraphCleanOutputToGraphViz("data/actual/bubbleadd/BubbleAddGraph/bin/5", "graphtest");
// g.start("CyclePath_7");
// g.start("SimplePath_7");
// g.start("SinglePath_7");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 80ce45a..aaafebe 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -3,6 +3,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Comparator;
import org.apache.hadoop.io.WritableComparable;
@@ -10,7 +11,6 @@
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
public class MessageWritable implements WritableComparable<MessageWritable> {
/**
@@ -22,6 +22,7 @@
private VKmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private PositionListWritable nodeIdList = new PositionListWritable();
+ private float averageCoverage;
private byte flag;
private boolean isFlip;
private int kmerlength = 0;
@@ -35,6 +36,7 @@
kmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable();
startVertexId = new VKmerBytesWritable();
+ averageCoverage = 0;
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -46,6 +48,7 @@
kmer = new VKmerBytesWritable(0);
neighberNode = new AdjacencyListWritable(kmerSize);
startVertexId = new VKmerBytesWritable(kmerSize);
+ averageCoverage = 0;
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -103,6 +106,7 @@
// kmer.reset();
neighberNode.reset(kmerSize);
startVertexId.reset(kmerSize);
+ averageCoverage = 0;
flag = Message.NON;
isFlip = false;
}
@@ -161,6 +165,14 @@
this.startVertexId.setAsCopy(startVertexId);
}
}
+
+ public float getAverageCoverage() {
+ return averageCoverage;
+ }
+
+ public void setAverageCoverage(float averageCoverage) {
+ this.averageCoverage = averageCoverage;
+ }
public int getLengthOfChain() {
return kmer.getKmerLetterLength();
@@ -216,6 +228,7 @@
nodeIdList.write(out);
if ((checkMessage & CheckMessage.START) != 0)
startVertexId.write(out);
+ out.writeFloat(averageCoverage);
out.writeBoolean(isFlip);
out.writeByte(flag);
out.writeBoolean(updateMsg);
@@ -236,6 +249,7 @@
nodeIdList.readFields(in);
if ((checkMessage & CheckMessage.START) != 0)
startVertexId.readFields(in);
+ averageCoverage = in.readFloat();
isFlip = in.readBoolean();
flag = in.readByte();
updateMsg = in.readBoolean();
@@ -264,4 +278,11 @@
public int compareTo(MessageWritable tp) {
return sourceVertexId.compareTo(tp.sourceVertexId);
}
+
+ public static final class SortByCoverage implements Comparator<MessageWritable> {
+ @Override
+ public int compare(MessageWritable left, MessageWritable right) {
+ return Float.compare(left.averageCoverage, right.averageCoverage);
+ }
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index dd78cde..90eefa1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -70,7 +70,7 @@
builder.append("Step: " + step + "\r\n");
builder.append("Source Code: " + source + "\r\n");
if (operation == 0) {
- if (destVertexId.getKmerLength() != -1) {
+ if (KmerBytesWritable.getKmerLength() != -1) {
String dest = destVertexId.toString();
builder.append("Send message to " + "\r\n");
builder.append("Destination Code: " + dest + "\r\n");
@@ -88,7 +88,7 @@
if (operation == 2) {
chain = mergeChain.toString();
builder.append("Merge Chain: " + chain + "\r\n");
- builder.append("Merge Chain Length: " + mergeChain.getKmerLength() + "\r\n");
+ builder.append("Merge Chain Length: " + KmerBytesWritable.getKmerLength() + "\r\n");
}
if (operation == 3)
builder.append("Vote to halt!");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 4cdeb89..22b4a73 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -1,6 +1,7 @@
package edu.uci.ics.genomix.pregelix.operator.bubblemerge;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -44,6 +45,33 @@
outgoingMsg.reset();
}
+ public void sendBubbleAndMajorVertexMsgToMinorVertex(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ if(hasNextDest(getVertexValue())){
+ outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ break;
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ if(hasPrevDest(getVertexValue())){
+ outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ destVertexId.setAsCopy(getPrevDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ break;
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
@@ -57,30 +85,8 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(VertexUtil.isPathVertex(getVertexValue())){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- switch(neighborToMeDir){
- case MessageFlag.DIR_RF:
- case MessageFlag.DIR_RR:
- if(hasNextDest(getVertexValue())){
- outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- }
- break;
- case MessageFlag.DIR_FF:
- case MessageFlag.DIR_FR:
- if(hasPrevDest(getVertexValue())){
- outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
- destVertexId.setAsCopy(getPrevDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- }
- break;
- }
+ /** send bubble and major vertex msg to minor vertex **/
+ sendBubbleAndMajorVertexMsgToMinorVertex();
}
}
} else if (getSuperstep() == 3){
@@ -99,67 +105,33 @@
}
}
for(VKmerBytesWritable prevId : receivedMsgMap.keySet()){
- receivedMsgList = receivedMsgMap.get(prevId);
- if(receivedMsgList.size() > 1){
+ if(receivedMsgList.size() > 1){ // filter bubble
/** for each startVertex, sort the node by decreasing order of coverage **/
+ receivedMsgList = receivedMsgMap.get(prevId);
+ Collections.sort(receivedMsgList, new MessageWritable.SortByCoverage());
+
+
/** process similarSet, keep the unchanged set and deleted set & add coverage to unchange node **/
/** send message to the unchanged set for updating coverage & send kill message to the deleted set **/
- //send unchange or merge Message to node with largest length
- if(flag == true){
- //1. send unchange Message to node with largest length
- // we can send no message to complete this step
- //2. send delete Message to node which doesn't have largest length
- for(int i = 0; i < receivedMsgList.size(); i++){
- //if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) != 0)
- if(receivedMsgList.get(i).getSourceVertexId().compareTo(secondMax) == 0){
- outgoingMsg.setMessage(AdjMessage.KILL);
- outgoingMsg.setStartVertexId(prevId);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(secondMax, outgoingMsg);
- } else if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) == 0){
- outgoingMsg.setMessage(AdjMessage.UNCHANGE);
- sendMsg(max, outgoingMsg);
- }
- }
- } else{
- //send merge Message to node with largest length
- for(int i = 0; i < receivedMsgList.size(); i++){
- //if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) != 0)
- if(receivedMsgList.get(i).getSourceVertexId().compareTo(secondMax) == 0){
- outgoingMsg.setMessage(AdjMessage.KILL);
- outgoingMsg.setStartVertexId(prevId);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(receivedMsgList.get(i).getSourceVertexId(), outgoingMsg);
- } else if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) == 0){
- outgoingMsg.setMessage(AdjMessage.MERGE);
- /* add other node in message */
- for(int j = 0; j < receivedMsgList.size(); i++){
- if(receivedMsgList.get(j).getSourceVertexId().compareTo(secondMax) == 0){
- outgoingMsg.setChainVertexId(receivedMsgList.get(j).getChainVertexId());
- break;
- }
- }
- sendMsg(receivedMsgList.get(i).getSourceVertexId(), outgoingMsg);
- }
- }
- }
+
}
}
} else if (getSuperstep() == 4){
if(msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.KILL){
+ if(incomingMsg.getFlag() == MessageFlag.KILL){
broadcaseKillself();
- } else if (incomingMsg.getMessage() == AdjMessage.MERGE){
- //merge with small node
- getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
- incomingMsg.getChainVertexId()));
- }
+ }
}
} else if(getSuperstep() == 5){
- responseToDeadVertex(msgIterator);
+ if(msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getFlag() == MessageFlag.KILL){
+ responseToDeadVertex();
+ }
+ }
}
voteToHalt();
}