Merge branch 'genomix/fullstack_genomix' into nanzhang/hyracks_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
deleted file mode 100644
index 220f45c..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package edu.uci.ics.genomix.oldtype;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-
-public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
-
- private static final long serialVersionUID = 1L;
- public static final IntermediateNodeWritable EMPTY_NODE = new IntermediateNodeWritable();
-
- private KmerListWritable forwardForwardList;
- private KmerListWritable forwardReverseList;
- private KmerListWritable reverseForwardList;
- private KmerListWritable reverseReverseList;
- private ReadIDWritable readId;
- public IntermediateNodeWritable(){
- forwardForwardList = new KmerListWritable();
- forwardReverseList = new KmerListWritable();
- reverseForwardList = new KmerListWritable();
- reverseReverseList = new KmerListWritable();
- readId = new ReadIDWritable();
- }
-
- public IntermediateNodeWritable(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
- this();
- set(FFList, FRList, RFList, RRList, uniqueKey);
- }
-
- public void set(IntermediateNodeWritable node){
- set(node.forwardForwardList, node.forwardReverseList, node.reverseForwardList,
- node.reverseReverseList, node.readId);
- }
-
- public void set(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
- this.forwardForwardList.set(FFList);
- this.forwardReverseList.set(FRList);
- this.reverseForwardList.set(RFList);
- this.reverseReverseList.set(RRList);
- this.readId.set(uniqueKey);
- }
-
- public void reset(int kmerSize) {
- forwardForwardList.reset();
- forwardReverseList.reset();
- reverseForwardList.reset();
- reverseReverseList.reset();
- readId.reset();
- }
-
- public KmerListWritable getFFList() {
- return forwardForwardList;
- }
-
- public void setFFList(KmerListWritable forwardForwardList) {
- this.forwardForwardList.set(forwardForwardList);
- }
-
- public KmerListWritable getFRList() {
- return forwardReverseList;
- }
-
- public void setFRList(KmerListWritable forwardReverseList) {
- this.forwardReverseList.set(forwardReverseList);
- }
-
- public KmerListWritable getRFList() {
- return reverseForwardList;
- }
-
- public void setRFList(KmerListWritable reverseForwardList) {
- this.reverseForwardList.set(reverseForwardList);
- }
-
- public KmerListWritable getRRList() {
- return reverseReverseList;
- }
-
- public void setRRList(KmerListWritable reverseReverseList) {
- this.reverseReverseList.set(reverseReverseList);
- }
-
- public ReadIDWritable getreadId() {
- return readId;
- }
-
- public void setreadId(ReadIDWritable readId) {
- this.readId.set(readId);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.forwardForwardList.readFields(in);
- this.forwardReverseList.readFields(in);
- this.reverseForwardList.readFields(in);
- this.reverseReverseList.readFields(in);
- this.readId.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- this.forwardForwardList.write(out);
- this.forwardReverseList.write(out);
- this.reverseForwardList.write(out);
- this.reverseReverseList.write(out);
- this.readId.write(out);
- }
-
- @Override
- public int compareTo(IntermediateNodeWritable other) {
- // TODO Auto-generated method stub
- return this.readId.compareTo(other.readId);
- }
-
- @Override
- public int hashCode() {
- return this.readId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof IntermediateNodeWritable) {
- IntermediateNodeWritable nw = (IntermediateNodeWritable) o;
- return (this.forwardForwardList.equals(nw.forwardForwardList)
- && this.forwardReverseList.equals(nw.forwardReverseList)
- && this.reverseForwardList.equals(nw.reverseForwardList)
- && this.reverseReverseList.equals(nw.reverseReverseList) && (this.readId.equals(nw.readId)));
- }
- return false;
- }
-
- @Override
- public String toString() {
- StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('(');
- sbuilder.append(readId.toString()).append('\t');
- sbuilder.append(forwardForwardList.toString()).append('\t');
- sbuilder.append(forwardReverseList.toString()).append('\t');
- sbuilder.append(reverseForwardList.toString()).append('\t');
- sbuilder.append(reverseReverseList.toString()).append('\t').append(')');
- return sbuilder.toString();
- }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
index eb0bd59..88bb79c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -56,11 +56,13 @@
}
public void append(KmerBytesWritable kmer){
- kmerByteSize = kmer.kmerByteSize;
- kmerlength = kmer.kmerlength;
- setSize((1 + valueCount) * kmerByteSize);
- System.arraycopy(kmer.getBytes(), 0, storage, offset + valueCount * kmerByteSize, kmerByteSize);
- valueCount += 1;
+ if(kmer != null){
+ kmerByteSize = kmer.kmerByteSize;
+ kmerlength = kmer.kmerlength;
+ setSize((1 + valueCount) * kmerByteSize);
+ System.arraycopy(kmer.getBytes(), 0, storage, offset + valueCount * kmerByteSize, kmerByteSize);
+ valueCount += 1;
+ }
}
/*
@@ -98,6 +100,10 @@
}
}
+ public void reset() {
+ this.reset(0);
+ }
+
public void reset(int kmerSize) {
kmerlength = kmerSize;
kmerByteSize = KmerUtil.getByteNumFromK(kmerlength);
@@ -156,12 +162,32 @@
};
return it;
}
+
+ /*
+ * remove the first instance of @toRemove. Uses a linear scan. Throws an exception if not in this list.
+ */
+ public void remove(KmerBytesWritable toRemove, boolean ignoreMissing) {
+ Iterator<KmerBytesWritable> posIterator = this.iterator();
+ while (posIterator.hasNext()) {
+ if(toRemove.equals(posIterator.next())) {
+ posIterator.remove();
+ return;
+ }
+ }
+ if (!ignoreMissing) {
+ throw new ArrayIndexOutOfBoundsException("the KmerBytesWritable `" + toRemove.toString() + "` was not found in this list.");
+ }
+ }
+
+ public void remove(KmerBytesWritable toRemove) {
+ remove(toRemove, false);
+ }
@Override
public void readFields(DataInput in) throws IOException {
this.valueCount = in.readInt();
- setSize(valueCount * kmerByteSize);
- in.readFully(storage, offset, valueCount * kmerByteSize);
+ setSize(valueCount * kmerByteSize);//kmerByteSize
+ in.readFully(storage, offset, valueCount * kmerByteSize);//kmerByteSize
}
@Override
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 4725e30..efa87f7 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -10,11 +10,6 @@
public class NodeWritable implements WritableComparable<NodeWritable>, Serializable{
-// public static class KMER{
-// public static final byte EXIST = 0;
-// public static final byte NON_EXIST = 1;
-// }
-
private static final long serialVersionUID = 1L;
public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
@@ -24,7 +19,7 @@
private KmerListWritable reverseForwardList;
private KmerListWritable reverseReverseList;
private KmerBytesWritable kmer;
-// private byte kmerMark;
+ private int kmerlength = 0;
// merge/update directions
public static class DirectionFlag {
@@ -39,14 +34,14 @@
this(0);
}
- public NodeWritable(int kmerSize) {
+ public NodeWritable(int kmerlenth) {
+ this.kmerlength = kmerlenth;
nodeIdList = new PositionListWritable();
- forwardForwardList = new KmerListWritable(kmerSize);
- forwardReverseList = new KmerListWritable(kmerSize);
- reverseForwardList = new KmerListWritable(kmerSize);
- reverseReverseList = new KmerListWritable(kmerSize);
- kmer = new KmerBytesWritable(kmerSize);
-// kmerMark = KMER.NON_EXIST;
+ forwardForwardList = new KmerListWritable(kmerlenth);
+ forwardReverseList = new KmerListWritable(kmerlenth);
+ reverseForwardList = new KmerListWritable(kmerlenth);
+ reverseReverseList = new KmerListWritable(kmerlenth);
+ kmer = new KmerBytesWritable(); //in graph construction - not set kmerlength Optimization: VKmer
}
public NodeWritable(PositionListWritable nodeIdList, KmerListWritable FFList, KmerListWritable FRList,
@@ -56,6 +51,7 @@
}
public void set(NodeWritable node){
+ this.kmerlength = node.kmerlength;
set(node.nodeIdList, node.forwardForwardList, node.forwardReverseList, node.reverseForwardList,
node.reverseReverseList, node.kmer);
}
@@ -68,17 +64,16 @@
this.reverseForwardList.set(RFList);
this.reverseReverseList.set(RRList);
this.kmer.set(kmer);
-// kmerMark = KMER.EXIST;
}
public void reset(int kmerSize) {
- nodeIdList.reset();
- forwardForwardList.reset(kmerSize);
- forwardReverseList.reset(kmerSize);
- reverseForwardList.reset(kmerSize);
- reverseReverseList.reset(kmerSize);
- kmer.reset(kmerSize);
-// kmerMark = KMER.NON_EXIST;
+ this.kmerlength = kmerSize;
+ this.nodeIdList.reset();
+ this.forwardForwardList.reset(kmerSize);
+ this.forwardReverseList.reset(kmerSize);
+ this.reverseForwardList.reset(kmerSize);
+ this.reverseReverseList.reset(kmerSize);
+ this.kmer.reset(0);
}
@@ -95,10 +90,17 @@
}
public void setKmer(KmerBytesWritable kmer) {
-// kmerMark = KMER.EXIST;
this.kmer.set(kmer);
}
+ public int getKmerlength() {
+ return kmerlength;
+ }
+
+ public void setKmerlength(int kmerlength) {
+ this.kmerlength = kmerlength;
+ }
+
public int getCount() {
return kmer.getKmerLength();
}
@@ -149,28 +151,28 @@
throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
}
}
+
@Override
public void write(DataOutput out) throws IOException {
-// out.writeByte(kmerMark);
-// this.nodeIdList.write(out);
+ out.writeInt(kmerlength);
+ this.nodeIdList.write(out);
this.forwardForwardList.write(out);
this.forwardReverseList.write(out);
-// this.reverseForwardList.write(out);
-// this.reverseReverseList.write(out);
-// if(kmerMark == KMER.EXIST)
-// this.kmer.write(out);
+ this.reverseForwardList.write(out);
+ this.reverseReverseList.write(out);
+ this.kmer.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
-// kmerMark = in.readByte();
-// this.nodeIdList.readFields(in);
+ this.kmerlength = in.readInt();
+ reset(kmerlength);
+ this.nodeIdList.readFields(in);
this.forwardForwardList.readFields(in);
this.forwardReverseList.readFields(in);
-// this.reverseForwardList.readFields(in);
-// this.reverseReverseList.readFields(in);
-// if(kmerMark == KMER.EXIST)
-// this.kmer.readFields(in);
+ this.reverseForwardList.readFields(in);
+ this.reverseReverseList.readFields(in);
+ this.kmer.readFields(in);
}
@Override
@@ -199,13 +201,13 @@
@Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('(');
+ sbuilder.append('{');
sbuilder.append(nodeIdList.toString()).append('\t');
sbuilder.append(forwardForwardList.toString()).append('\t');
sbuilder.append(forwardReverseList.toString()).append('\t');
sbuilder.append(reverseForwardList.toString()).append('\t');
sbuilder.append(reverseReverseList.toString()).append('\t');
- sbuilder.append(kmer.toString()).append(')');
+ sbuilder.append(kmer.toString()).append('}');
return sbuilder.toString();
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 37c64aa..b4361ac 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -228,6 +228,15 @@
merge.set(kmer1);
merge.mergeWithRFKmer(i, kmer2);
Assert.assertEquals("GGCACAACAACCC", merge.toString());
+
+ String test1 = "CTA";
+ String test2 = "AGA";
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("CTAT", k1);
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
index 71246a1..1bbb771 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
@@ -30,7 +30,7 @@
Assert.assertEquals(1, kmerList.getCountOfPosition());
}
- kmerList.reset(kmer.getKmerLength());
+ kmerList.reset(0);
//add one more kmer each time and fix kmerSize
for (int i = 0; i < 200; i++) {
kmer = new KmerBytesWritable(5);
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
index 540c6eb..fc67245 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
@@ -1,4 +1,5 @@
package edu.uci.ics.genomix.data.test;
+
import java.util.Random;
import junit.framework.Assert;
diff --git a/genomix/genomix-hadoop/data/webmap/8 b/genomix/genomix-hadoop/data/webmap/8
new file mode 100644
index 0000000..3959d4d
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/8
@@ -0,0 +1 @@
+1 AATAGAACTT
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/2 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/2
new file mode 100644
index 0000000..0f501fe
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/2
@@ -0,0 +1 @@
+1 AATA
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/2~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/2~
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/2~
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/3 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/3
new file mode 100644
index 0000000..b90246c
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/3
@@ -0,0 +1 @@
+1 AATAG
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/3~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/3~
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/3~
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/4 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/4
new file mode 100644
index 0000000..3f1cd5c
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/4
@@ -0,0 +1 @@
+1 AATAGA
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/4~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/4~
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/4~
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/5 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/5
new file mode 100644
index 0000000..a720dc4
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/5
@@ -0,0 +1 @@
+1 AATAGAA
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/5~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/5~
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/5~
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/6 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/6
new file mode 100644
index 0000000..7a95b7c
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/6
@@ -0,0 +1 @@
+1 AATAGAAC
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/6~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/6~
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/6~
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/7 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/7
new file mode 100644
index 0000000..ce4b8a8
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/7
@@ -0,0 +1 @@
+1 AATAGAACT
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/7~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/7~
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/7~
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/8 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/8
new file mode 100644
index 0000000..3959d4d
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/8
@@ -0,0 +1 @@
+1 AATAGAACTT
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/8~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/8~
new file mode 100644
index 0000000..89ead1e
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/8~
@@ -0,0 +1 @@
+1 AATAGAACTTA
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/9 b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/9
new file mode 100644
index 0000000..89ead1e
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/9
@@ -0,0 +1 @@
+1 AATAGAACTTA
diff --git a/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/9~ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/9~
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/pathmerge_TestSet/9~
diff --git a/genomix/genomix-hadoop/data/webmap/test.txt b/genomix/genomix-hadoop/data/webmap/test.txt
index 17770fa..990dbd1 100644
--- a/genomix/genomix-hadoop/data/webmap/test.txt
+++ b/genomix/genomix-hadoop/data/webmap/test.txt
@@ -1 +1,3 @@
1 AATAGAAG
+2 TATAGACC
+3 CATAGATT
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
index 3b615cb..98f561f 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
@@ -226,7 +226,7 @@
public void setCurKmerByOldNextKmer(){
curKmerDir = nextKmerDir;
curForwardKmer.set(nextForwardKmer);
- preReverseKmer.set(nextReverseKmer);
+ curReverseKmer.set(nextReverseKmer);
}
public void setMapperOutput(OutputCollector<KmerBytesWritable, NodeWritable> output) throws IOException{
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index 6472f05..1633c26 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
@@ -15,19 +16,23 @@
public class GenomixReducer extends MapReduceBase implements
Reducer<KmerBytesWritable, NodeWritable, KmerBytesWritable, NodeWritable>{
- private NodeWritable outputNode = new NodeWritable();
- private NodeWritable tmpNode = new NodeWritable();
+ public static int KMER_SIZE;
+ private NodeWritable outputNode;
+ private NodeWritable tmpNode;
+
+ @Override
+ public void configure(JobConf job) {
+ KMER_SIZE = GenomixMapper.KMER_SIZE;
+ outputNode = new NodeWritable(KMER_SIZE);
+ tmpNode = new NodeWritable(KMER_SIZE);
+ }
+
@Override
public void reduce(KmerBytesWritable key, Iterator<NodeWritable> values,
OutputCollector<KmerBytesWritable, NodeWritable> output,
Reporter reporter) throws IOException {
- outputNode.reset(0);
+ outputNode.reset(KMER_SIZE);
-// //copy first item to outputNode
-// if(values.hasNext()){
-// NodeWritable tmpNode = values.next();
-// outputNode.set(tmpNode);
-// }
while (values.hasNext()) {
tmpNode.set(values.next());
outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 4716072..217e882 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,13 +22,13 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/test.txt";
+ private static final String DATA_PATH = "data/webmap/pathmerge_TestSet/9";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
// private static final int COUNT_REDUCER = 2;
- private static final int SIZE_KMER = 5;
- private static final int READ_LENGTH = 8;
+ private static final int SIZE_KMER = 3;
+ private static final int READ_LENGTH = 11;
private MiniDFSCluster dfsCluster;
private MiniMRCluster mrCluster;
@@ -45,7 +45,7 @@
public void TestMapKmerToNode() throws Exception {
GenomixDriver driver = new GenomixDriver();
- driver.run(HDFS_PATH, RESULT_PATH, 0, SIZE_KMER, READ_LENGTH, true, HADOOP_CONF_PATH);
+ driver.run(HDFS_PATH, RESULT_PATH, 1, SIZE_KMER, READ_LENGTH, true, HADOOP_CONF_PATH);
dumpResult();
}
@@ -77,6 +77,6 @@
Path src = new Path(RESULT_PATH);
Path dest = new Path(ACTUAL_RESULT_DIR);
dfs.copyToLocalFile(src, dest);
- HadoopMiniClusterTest.copyResultsToLocal(RESULT_PATH, "test.txt", false, conf, true, dfs);
+ HadoopMiniClusterTest.copyResultsToLocal(RESULT_PATH, "actual/test.txt", false, conf, true, dfs);
}
}
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/2/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/2/part-00000
new file mode 100755
index 0000000..6be54c5
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/2/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/3/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/3/part-00000
new file mode 100755
index 0000000..6f2b1d8
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/3/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/4/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/4/part-00000
new file mode 100755
index 0000000..7fbe1a4
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/4/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/5/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/5/part-00000
new file mode 100755
index 0000000..1887e36
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/5/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/6/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/6/part-00000
new file mode 100755
index 0000000..72b4009
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/6/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/7/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/7/part-00000
new file mode 100755
index 0000000..394c8c9
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/7/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/8/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/8/part-00000
new file mode 100755
index 0000000..db653a7
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/8/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/9/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/9/part-00000
new file mode 100755
index 0000000..b7760c2
--- /dev/null
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/9/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
index 2f66bbc..396d5ec 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
@@ -13,8 +13,8 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
public class BinaryDataCleanVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M> {
@@ -38,7 +38,7 @@
public static abstract class BinaryDataCleanVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<PositionWritable, VertexValueWritable> lineRecordReader;
+ private final RecordReader<KmerBytesWritable, VertexValueWritable> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -48,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryDataCleanVertexReader(RecordReader<PositionWritable, VertexValueWritable> recordReader) {
+ public BinaryDataCleanVertexReader(RecordReader<KmerBytesWritable, VertexValueWritable> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -74,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<PositionWritable, VertexValueWritable> getRecordReader() {
+ protected RecordReader<KmerBytesWritable, VertexValueWritable> getRecordReader() {
return lineRecordReader;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
index c23ceeb..c07d076 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
@@ -10,8 +10,8 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<PositionWritable, VertexValueWritable> lineRecordWriter;
+ private final RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<PositionWritable, VertexValueWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<PositionWritable, VertexValueWritable> getRecordWriter() {
+ public RecordWriter<KmerBytesWritable, VertexValueWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
deleted file mode 100644
index a147e2f..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package edu.uci.ics.genomix.pregelix.api.io.binary;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-
-/**
- * Abstract class that users should subclass to use their own text based vertex
- * output format.
- *
- * @param <I>
- * Vertex index value
- * @param <V>
- * Vertex value
- * @param <E>
- * Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class BinaryVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
- extends VertexOutputFormat<I, V, E> {
- /** Uses the SequenceFileOutputFormat to do everything */
- protected SequenceFileOutputFormat binaryOutputFormat = new SequenceFileOutputFormat();
-
- /**
- * Abstract class to be implemented by the user based on their specific
- * vertex output. Easiest to ignore the key value separator and only use key
- * instead.
- *
- * @param <I>
- * Vertex index value
- * @param <V>
- * Vertex value
- * @param <E>
- * Edge value
- */
- public static abstract class BinaryVertexWriter<I extends WritableComparable, V extends Writable, E extends Writable>
- implements VertexWriter<I, V, E> {
- /** Context passed to initialize */
- private TaskAttemptContext context;
- /** Internal line record writer */
- private final RecordWriter<NodeWritable, NullWritable> lineRecordWriter;
-
- /**
- * Initialize with the LineRecordWriter.
- *
- * @param lineRecordWriter
- * Line record writer from SequenceFileOutputFormat
- */
- public BinaryVertexWriter(RecordWriter<NodeWritable, NullWritable> lineRecordWriter) {
- this.lineRecordWriter = lineRecordWriter;
- }
-
- @Override
- public void initialize(TaskAttemptContext context) throws IOException {
- this.context = context;
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- lineRecordWriter.close(context);
- }
-
- /**
- * Get the line record writer.
- *
- * @return Record writer to be used for writing.
- */
- public RecordWriter<NodeWritable, NullWritable> getRecordWriter() {
- return lineRecordWriter;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- public TaskAttemptContext getContext() {
- return context;
- }
- }
-
- @Override
- public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
- binaryOutputFormat.checkOutputSpecs(context);
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return binaryOutputFormat.getOutputCommitter(context);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/InitialGraphCleanVertexInputFormat.java
similarity index 86%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/InitialGraphCleanVertexInputFormat.java
index f9a3068..d6be23b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/InitialGraphCleanVertexInputFormat.java
@@ -3,7 +3,6 @@
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -14,9 +13,10 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
-public class BinaryVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
+public class InitialGraphCleanVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M> {
/** Uses the SequenceFileInputFormat to do everything */
@@ -38,7 +38,7 @@
public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<NodeWritable, NullWritable> lineRecordReader;
+ private final RecordReader<KmerBytesWritable, NodeWritable> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -48,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryVertexReader(RecordReader<NodeWritable, NullWritable> recordReader) {
+ public BinaryVertexReader(RecordReader<KmerBytesWritable, NodeWritable> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -74,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<NodeWritable, NullWritable> getRecordReader() {
+ protected RecordReader<KmerBytesWritable, NodeWritable> getRecordReader() {
return lineRecordReader;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
deleted file mode 100644
index 11606db..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-
-public class DataCleanOutputFormat extends
- BinaryDataCleanVertexOutputFormat<PositionWritable, VertexValueWritable, NullWritable> {
-
- @Override
- public VertexWriter<PositionWritable, VertexValueWritable, NullWritable> createVertexWriter(
- TaskAttemptContext context) throws IOException, InterruptedException {
- @SuppressWarnings("unchecked")
- RecordWriter<PositionWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
- return new BinaryLoadGraphVertexWriter(recordWriter);
- }
-
- /**
- * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
- */
- public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<PositionWritable, VertexValueWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, VertexValueWritable> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
- throws IOException, InterruptedException {
- //if(vertex.getVertexValue().getState() != MessageFlag.IS_OLDHEAD)
- getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
similarity index 75%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
index 23a53f9..e36e344 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
@@ -10,20 +10,20 @@
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat.BinaryDataCleanVertexReader;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
-public class DataCleanInputFormat extends
- BinaryDataCleanVertexInputFormat<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+public class GraphCleanInputFormat extends
+ BinaryDataCleanVertexInputFormat<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
+ public VertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryDataCleanLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -31,12 +31,12 @@
@SuppressWarnings("rawtypes")
class BinaryDataCleanLoadGraphReader extends
- BinaryDataCleanVertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ BinaryDataCleanVertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
private Vertex vertex;
- private PositionWritable vertexId = new PositionWritable();
+ private KmerBytesWritable vertexId = new KmerBytesWritable();
private VertexValueWritable vertexValue = new VertexValueWritable();
- public BinaryDataCleanLoadGraphReader(RecordReader<PositionWritable, VertexValueWritable> recordReader) {
+ public BinaryDataCleanLoadGraphReader(RecordReader<KmerBytesWritable, VertexValueWritable> recordReader) {
super(recordReader);
}
@@ -47,7 +47,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
+ public Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanOutputFormat.java
new file mode 100644
index 0000000..32f71be
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanOutputFormat.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class GraphCleanOutputFormat extends
+ BinaryDataCleanVertexOutputFormat<KmerBytesWritable, VertexValueWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
similarity index 62%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
index 93781a6..0d685de 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
@@ -10,22 +10,22 @@
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
+import edu.uci.ics.genomix.pregelix.api.io.binary.InitialGraphCleanVertexInputFormat;
+import edu.uci.ics.genomix.pregelix.api.io.binary.InitialGraphCleanVertexInputFormat.BinaryVertexReader;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
-public class NaiveAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+public class InitialGraphCleanInputFormat extends
+ InitialGraphCleanVertexInputFormat<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
+ public VertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -33,13 +33,14 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ BinaryVertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+
private Vertex vertex;
+ private KmerBytesWritable vertexId = new KmerBytesWritable();
private NodeWritable node = new NodeWritable();
- private PositionWritable vertexId = new PositionWritable();
private VertexValueWritable vertexValue = new VertexValueWritable();
- public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, NodeWritable> recordReader) {
super(recordReader);
}
@@ -50,34 +51,36 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
+ public Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
-
+
vertex.getMsgList().clear();
vertex.getEdges().clear();
-
+
vertex.reset();
if (getRecordReader() != null) {
/**
* set the src vertex id
*/
- node = getRecordReader().getCurrentKey();
- vertexId.set(node.getNodeID());
+ vertexId.set(getRecordReader().getCurrentKey());
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
+ node.set(getRecordReader().getCurrentValue());
+ vertexValue.setKmerlength(node.getKmerlength());
+ vertexValue.setNodeIdList(node.getNodeIdList());
vertexValue.setFFList(node.getFFList());
vertexValue.setFRList(node.getFRList());
vertexValue.setRFList(node.getRFList());
vertexValue.setRRList(node.getRRList());
- vertexValue.setKmer(node.getKmer());
+ vertexValue.setKmer(getRecordReader().getCurrentKey());
vertexValue.setState(State.IS_NON);
vertex.setVertexValue(vertexValue);
}
-
+
return vertex;
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
deleted file mode 100644
index 25c884b..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-
-public class LogAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- /**
- * Format INPUT
- */
- @SuppressWarnings("unchecked")
- @Override
- public VertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
- }
-
- @SuppressWarnings("rawtypes")
- class BinaryLoadGraphReader extends
- BinaryVertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- private Vertex vertex = null;
- private NodeWritable node = new NodeWritable();
- private PositionWritable vertexId = new PositionWritable();
- private VertexValueWritable vertexValue = new VertexValueWritable();
-
- public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
- super(recordReader);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
- throws IOException, InterruptedException {
- if (vertex == null)
- vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
-
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
-
- if (getRecordReader() != null) {
- /**
- * set the src vertex id
- */
- node = getRecordReader().getCurrentKey();
- vertexId.set(node.getNodeID());
- vertex.setVertexId(vertexId);
- /**
- * set the vertex value
- */
- vertexValue.setFFList(node.getFFList());
- vertexValue.setFRList(node.getFRList());
- vertexValue.setRFList(node.getRFList());
- vertexValue.setRRList(node.getRRList());
- vertexValue.setKmer(node.getKmer());
- vertexValue.setState(State.IS_NON);
- vertex.setVertexValue(vertexValue);
- }
-
- return vertex;
- }
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
deleted file mode 100644
index 20a4587..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-
-public class LogAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<PositionWritable, VertexValueWritable, NullWritable> {
-
- @Override
- public VertexWriter<PositionWritable, VertexValueWritable, NullWritable> createVertexWriter(
- TaskAttemptContext context) throws IOException, InterruptedException {
- @SuppressWarnings("unchecked")
- RecordWriter<NodeWritable, NullWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
- return new BinaryLoadGraphVertexWriter(recordWriter);
- }
-
- /**
- * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
- */
- public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<PositionWritable, VertexValueWritable, NullWritable> {
- private NodeWritable node = new NodeWritable();
- private NullWritable nul = NullWritable.get();
-
- public BinaryLoadGraphVertexWriter(RecordWriter<NodeWritable, NullWritable> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
- throws IOException, InterruptedException {
- node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
- vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
- vertex.getVertexValue().getRRList(), vertex.getVertexValue().getKmer());
- getRecordWriter().write(node, nul);
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
deleted file mode 100644
index 77a893a..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-
-public class NaiveAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<PositionWritable, VertexValueWritable, NullWritable> {
-
- @Override
- public VertexWriter<PositionWritable, VertexValueWritable, NullWritable> createVertexWriter(
- TaskAttemptContext context) throws IOException, InterruptedException {
- @SuppressWarnings("unchecked")
- RecordWriter<NodeWritable, NullWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
- return new BinaryLoadGraphVertexWriter(recordWriter);
- }
-
- /**
- * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
- */
- public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<PositionWritable, VertexValueWritable, NullWritable> {
- private NodeWritable node = new NodeWritable();
- private NullWritable nullWritable = NullWritable.get();
-
- public BinaryLoadGraphVertexWriter(RecordWriter<NodeWritable, NullWritable> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
- throws IOException, InterruptedException {
- node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
- vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
- vertex.getVertexValue().getRRList(), vertex.getVertexValue().getKmer());
- getRecordWriter().write(node, nullWritable);
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index 4aeffa0..c35ad7f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -7,14 +7,20 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
- private PositionListWritable forwardList;
- private PositionListWritable reverseList;
+ private KmerListWritable forwardList;
+ private KmerListWritable reverseList;
public AdjacencyListWritable(){
- forwardList = new PositionListWritable();
- reverseList = new PositionListWritable();
+ forwardList = new KmerListWritable();
+ reverseList = new KmerListWritable();
+ }
+
+ public AdjacencyListWritable(int kmerSize){
+ forwardList = new KmerListWritable(kmerSize);
+ reverseList = new KmerListWritable(kmerSize);
}
public void set(AdjacencyListWritable adjacencyList){
@@ -27,23 +33,28 @@
reverseList.reset();
}
+ public void reset(int kmerSize){
+ forwardList.reset(kmerSize);
+ reverseList.reset(kmerSize);
+ }
+
public int getCountOfPosition(){
return forwardList.getCountOfPosition() + reverseList.getCountOfPosition();
}
-
- public PositionListWritable getForwardList() {
+
+ public KmerListWritable getForwardList() {
return forwardList;
}
- public void setForwardList(PositionListWritable forwardList) {
+ public void setForwardList(KmerListWritable forwardList) {
this.forwardList = forwardList;
}
- public PositionListWritable getReverseList() {
+ public KmerListWritable getReverseList() {
return reverseList;
}
- public void setReverseList(PositionListWritable reverseList) {
+ public void setReverseList(KmerListWritable reverseList) {
this.reverseList = reverseList;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index e1e9a3e..1c5f325 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -6,7 +6,6 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -17,16 +16,17 @@
* stores neighber vertexValue when pathVertex sends the message
* file stores the point to the file that stores the chains of connected DNA
*/
- private PositionWritable sourceVertexId;
+ private KmerBytesWritable sourceVertexId;
private KmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private byte flag;
private boolean isFlip;
+ private int kmerlength = 0;
private byte checkMessage;
public MessageWritable() {
- sourceVertexId = new PositionWritable();
+ sourceVertexId = new KmerBytesWritable();
kmer = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable();
flag = Message.NON;
@@ -34,7 +34,18 @@
checkMessage = (byte) 0;
}
+ public MessageWritable(int kmerSize) {
+ kmerlength = kmerSize;
+ sourceVertexId = new KmerBytesWritable();
+ kmer = new KmerBytesWritable(0);
+ neighberNode = new AdjacencyListWritable(kmerSize);
+ flag = Message.NON;
+ isFlip = false;
+ checkMessage = (byte) 0;
+ }
+
public void set(MessageWritable msg) {
+ this.kmerlength = msg.kmerlength;
checkMessage = 0;
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
@@ -52,11 +63,12 @@
this.flag = msg.getFlag();
}
- public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+ public void set(int kmerlength, KmerBytesWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+ this.kmerlength = kmerlength;
checkMessage = 0;
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ this.sourceVertexId.set(sourceVertexId);
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.CHAIN;
@@ -75,15 +87,22 @@
neighberNode.reset();
flag = Message.NON;
}
+
+ public void reset(int kmerSize) {
+ checkMessage = 0;
+ kmer.reset(1);
+ neighberNode.reset(kmerSize);
+ flag = Message.NON;
+ }
- public PositionWritable getSourceVertexId() {
+ public KmerBytesWritable getSourceVertexId() {
return sourceVertexId;
}
- public void setSourceVertexId(PositionWritable sourceVertexId) {
+ public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ this.sourceVertexId.set(sourceVertexId);
}
}
@@ -131,6 +150,7 @@
@Override
public void write(DataOutput out) throws IOException {
+ out.writeInt(kmerlength);
out.writeByte(checkMessage);
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
@@ -139,12 +159,13 @@
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
out.writeBoolean(isFlip);
- out.write(flag);
+ out.writeByte(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
- this.reset();
+ kmerlength = in.readInt();
+ this.reset(kmerlength);
checkMessage = in.readByte();
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 56cc86b..5d06234 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -4,10 +4,10 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
@@ -35,31 +35,40 @@
public static final byte SHOULD_MERGE_CLEAR = 0b1110011;
}
+ private PositionListWritable nodeIdList;
private AdjacencyListWritable incomingList;
private AdjacencyListWritable outgoingList;
private byte state;
private KmerBytesWritable kmer;
- private PositionWritable mergeDest;
+ private KmerBytesWritable mergeDest;
+ private int kmerlength = 0;
public VertexValueWritable() {
+ this(0);
+ }
+
+ public VertexValueWritable(int kmerSize){
+ kmerlength = kmerSize;
+ nodeIdList = new PositionListWritable();
incomingList = new AdjacencyListWritable();
outgoingList = new AdjacencyListWritable();
state = State.IS_NON;
- kmer = new KmerBytesWritable(0);
- mergeDest = new PositionWritable();
+ kmer = new KmerBytesWritable(kmerSize);
+ mergeDest = new KmerBytesWritable(kmerSize);
}
- public VertexValueWritable(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
- PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
+ public VertexValueWritable(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
+ KmerListWritable reverseForwardList, KmerListWritable reverseReverseList,
byte state, KmerBytesWritable kmer) {
- set(forwardForwardList, forwardReverseList,
+ set(nodeIdList, forwardForwardList, forwardReverseList,
reverseForwardList, reverseReverseList,
state, kmer);
}
- public void set(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
- PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
+ public void set(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
+ KmerListWritable reverseForwardList, KmerListWritable reverseReverseList,
byte state, KmerBytesWritable kmer) {
+ this.kmerlength = kmer.kmerByteSize;
this.incomingList.setForwardList(reverseForwardList);
this.incomingList.setReverseList(reverseReverseList);
this.outgoingList.setForwardList(forwardForwardList);
@@ -69,39 +78,49 @@
}
public void set(VertexValueWritable value) {
- set(value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
+ this.kmerlength = value.kmerlength;
+ set(value.getNodeIdList(), value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
value.getKmer());
}
- public PositionListWritable getFFList() {
+
+ public PositionListWritable getNodeIdList() {
+ return nodeIdList;
+ }
+
+ public void setNodeIdList(PositionListWritable nodeIdList) {
+ this.nodeIdList.set(nodeIdList);
+ }
+
+ public KmerListWritable getFFList() {
return outgoingList.getForwardList();
}
- public PositionListWritable getFRList() {
+ public KmerListWritable getFRList() {
return outgoingList.getReverseList();
}
- public PositionListWritable getRFList() {
+ public KmerListWritable getRFList() {
return incomingList.getForwardList();
}
- public PositionListWritable getRRList() {
+ public KmerListWritable getRRList() {
return incomingList.getReverseList();
}
- public void setFFList(PositionListWritable forwardForwardList){
+ public void setFFList(KmerListWritable forwardForwardList){
outgoingList.setForwardList(forwardForwardList);
}
- public void setFRList(PositionListWritable forwardReverseList){
+ public void setFRList(KmerListWritable forwardReverseList){
outgoingList.setReverseList(forwardReverseList);
}
- public void setRFList(PositionListWritable reverseForwardList){
+ public void setRFList(KmerListWritable reverseForwardList){
incomingList.setForwardList(reverseForwardList);
}
- public void setRRList(PositionListWritable reverseReverseList){
+ public void setRRList(KmerListWritable reverseReverseList){
incomingList.setReverseList(reverseReverseList);
}
@@ -141,30 +160,54 @@
this.kmer.set(kmer);
}
- public PositionWritable getMergeDest() {
+ public KmerBytesWritable getMergeDest() {
return mergeDest;
}
- public void setMergeDest(PositionWritable mergeDest) {
+ public void setMergeDest(KmerBytesWritable mergeDest) {
this.mergeDest = mergeDest;
}
+
+
+ public int getKmerlength() {
+ return kmerlength;
+ }
+ public void setKmerlength(int kmerlength) {
+ this.kmerlength = kmerlength;
+ }
+
+ public void reset(int kmerSize) {
+ this.kmerlength = kmerSize;
+ this.nodeIdList.reset();
+ this.incomingList.getForwardList().reset(kmerSize);
+ this.incomingList.getReverseList().reset(kmerSize);
+ this.outgoingList.getForwardList().reset(kmerSize);
+ this.outgoingList.getReverseList().reset(kmerSize);
+ this.kmer.reset(0);
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
- incomingList.readFields(in);
- outgoingList.readFields(in);
- state = in.readByte();
- kmer.readFields(in);
- mergeDest.readFields(in);
+ this.kmerlength = in.readInt();
+ this.reset(kmerlength);
+ this.nodeIdList.readFields(in);
+ this.incomingList.readFields(in);
+ this.outgoingList.readFields(in);
+ this.state = in.readByte();
+ this.kmer.readFields(in);
+ this.mergeDest.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
- incomingList.write(out);
- outgoingList.write(out);
- out.writeByte(state);
- kmer.write(out);
- mergeDest.write(out);
+ out.writeInt(this.kmerlength);
+ this.nodeIdList.write(out);
+ this.incomingList.write(out);
+ this.outgoingList.write(out);
+ out.writeByte(this.state);
+ this.kmer.write(out);
+ this.mergeDest.write(out);
}
@Override
@@ -175,12 +218,13 @@
@Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('(');
+ sbuilder.append('{');
+ sbuilder.append(nodeIdList.toString()).append('\t');
sbuilder.append(outgoingList.getForwardList().toString()).append('\t');
sbuilder.append(outgoingList.getReverseList().toString()).append('\t');
sbuilder.append(incomingList.getForwardList().toString()).append('\t');
sbuilder.append(incomingList.getReverseList().toString()).append('\t');
- sbuilder.append(kmer.toString()).append(')');
+ sbuilder.append(kmer.toString()).append('}');
return sbuilder.toString();
}
@@ -195,8 +239,8 @@
/*
* Process any changes to value. This is for edge updates
*/
- public void processUpdates(byte neighborToDeleteDir, PositionWritable nodeToDelete,
- byte neighborToMergeDir, PositionWritable nodeToAdd){
+ public void processUpdates(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
+ byte neighborToMergeDir, KmerBytesWritable nodeToAdd){
// TODO
// this.getListFromDir(neighborToDeleteDir).remove(nodeToDelete);
// this.getListFromDir(neighborToMergeDir).append(nodeToDelete);
@@ -234,21 +278,25 @@
/*
* Process any changes to value. This is for merging
*/
- public void processMerges(byte neighborToDeleteDir, PositionWritable nodeToDelete,
- byte neighborToMergeDir, PositionWritable nodeToAdd,
+ public void processMerges(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
+ byte neighborToMergeDir, KmerBytesWritable nodeToAdd,
int kmerSize, KmerBytesWritable kmer){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete); //set(null);
+ this.getKmer().mergeWithFFKmer(kmerSize, kmer);
break;
case MessageFlag.DIR_FR:
this.getFRList().remove(nodeToDelete);
+ this.getKmer().mergeWithFRKmer(kmerSize, kmer);
break;
case MessageFlag.DIR_RF:
this.getRFList().remove(nodeToDelete);
+ this.getKmer().mergeWithRFKmer(kmerSize, kmer);
break;
case MessageFlag.DIR_RR:
this.getRRList().remove(nodeToDelete);
+ this.getKmer().mergeWithRRKmer(kmerSize, kmer);
break;
}
// TODO: remove switch below and replace with general direction merge
@@ -256,19 +304,15 @@
switch (neighborToMergeDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
- this.getKmer().mergeWithFFKmer(kmerSize, kmer);
this.getFFList().append(nodeToAdd);
break;
case MessageFlag.DIR_FR:
- this.getKmer().mergeWithFRKmer(kmerSize, kmer);
this.getFRList().append(nodeToAdd);
break;
case MessageFlag.DIR_RF:
- this.getKmer().mergeWithRFKmer(kmerSize, kmer);
this.getRFList().append(nodeToAdd);
break;
case MessageFlag.DIR_RR:
- this.getKmer().mergeWithRRKmer(kmerSize, kmer);
this.getRRList().append(nodeToAdd);
break;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 15b77bd..fa353d0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -10,8 +10,8 @@
import edu.uci.ics.genomix.oldtype.PositionListWritable;
import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
-import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
@@ -112,8 +112,8 @@
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(DataCleanInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 7bef7cf..9db9418 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -6,8 +6,8 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
-import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicPathMergeVertex;
@@ -167,8 +167,8 @@
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(DataCleanInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index 5f30d23..ebb4f74 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -10,8 +10,8 @@
import edu.uci.ics.genomix.oldtype.PositionListWritable;
import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
-import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
@@ -108,8 +108,8 @@
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(DataCleanInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 6ca08eb..40e3191 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -12,8 +12,8 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
-import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
@@ -356,8 +356,8 @@
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(DataCleanInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 1a793cd..937fa42 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -7,29 +7,29 @@
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/**
* Naive Algorithm for path merge graph
*/
public class BasicPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BasicPathMergeVertex.kmerSize";
public static final String ITERATIONS = "BasicPathMergeVertex.iteration";
public static int kmerSize = -1;
protected int maxIteration = -1;
- protected MessageWritable incomingMsg = new MessageWritable();
- protected MessageWritable outgoingMsg = new MessageWritable();
- protected PositionWritable destVertexId = new PositionWritable();
- protected Iterator<PositionWritable> posIterator;
- private PositionWritable pos = new PositionWritable();
+ protected MessageWritable incomingMsg = null; // = new MessageWritable();
+ protected MessageWritable outgoingMsg = null; // = new MessageWritable();
+ protected KmerBytesWritable destVertexId = new KmerBytesWritable();
+ protected Iterator<KmerBytesWritable> posIterator;
+ private KmerBytesWritable kmer = new KmerBytesWritable();
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -66,7 +66,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ public KmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
return posIterator.next();
@@ -78,7 +78,7 @@
}
}
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ public KmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
return posIterator.next();
@@ -93,7 +93,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
outFlag |= MessageFlag.DIR_FF;
@@ -108,7 +108,7 @@
}
- public PositionWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
outFlag |= MessageFlag.DIR_RF;
@@ -315,10 +315,10 @@
* @throws IOException
*/
public void broadcastUpdateMsg(){
- if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ if((getVertexValue().getState() & State.IS_HEAD) > 0)
outFlag |= MessageFlag.IS_HEAD;
- switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK){
- case MessageFlag.SHOULD_MERGEWITHPREV:
+ switch(getVertexValue().getState() & State.SHOULD_MERGE_MASK){
+ case State.SHOULD_MERGEWITHPREV:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
outgoingMsg.setFlip(true);
@@ -328,7 +328,7 @@
if(getNextDestVertexId(getVertexValue()) != null)
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
break;
- case MessageFlag.SHOULD_MERGEWITHNEXT:
+ case State.SHOULD_MERGEWITHNEXT:
setPredecessorAdjMsg();
if(ifFilpWithSuccessor())
outgoingMsg.setFlip(true);
@@ -365,6 +365,8 @@
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -376,6 +378,8 @@
setPredecessorAdjMsg();
if(ifFilpWithSuccessor())
outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -397,6 +401,8 @@
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -408,6 +414,8 @@
setPredecessorAdjMsg();
if(ifFilpWithSuccessor())
outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
@@ -424,20 +432,13 @@
getVertexValue().setState(state);
}
-// public byte isHeadShouldMergeWithPrev(){
-// return (byte) (getVertexValue().getState() & State.HEAD_SHOULD_MERGEWITHPREV);
-// }
/**
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
public void sendUpdateMsgToPredecessor(){
- setStateAsMergeWithNext();
if(hasNextDest(getVertexValue())){
-// if(getVertexValue().getFFList().getLength() > 0)
-// getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
-// else
-// getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
+ setStateAsMergeWithNext();
broadcastUpdateMsg();
}
}
@@ -448,20 +449,13 @@
getVertexValue().setState(state);
}
-// public byte isHeadShouldMergeWithNext(){
-// return (byte) (getVertexValue().getState() & State.HEAD_SHOULD_MERGEWITHNEXT);
-// }
/**
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
public void sendUpdateMsgToSuccessor(){
- setStateAsMergeWithPrev();
- if(hasNextDest(getVertexValue())){
-// if(getVertexValue().getRFList().getLength() > 0)
-// getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
-// else
-// getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
+ if(hasPrevDest(getVertexValue())){
+ setStateAsMergeWithPrev();
broadcastUpdateMsg();
}
}
@@ -618,8 +612,8 @@
//remove incomingMsg.getSourceId from RR positionList
posIterator = getVertexValue().getRRList().iterator();
while(posIterator.hasNext()){
- pos = posIterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
+ kmer = posIterator.next();
+ if(kmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
@@ -628,8 +622,8 @@
//remove incomingMsg.getSourceId from FR positionList
posIterator = getVertexValue().getFRList().iterator();
while(posIterator.hasNext()){
- pos = posIterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
+ kmer = posIterator.next();
+ if(kmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
@@ -638,8 +632,8 @@
//remove incomingMsg.getSourceId from RF positionList
posIterator = getVertexValue().getRFList().iterator();
while(posIterator.hasNext()){
- pos = posIterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
+ kmer = posIterator.next();
+ if(kmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
@@ -648,8 +642,8 @@
//remove incomingMsg.getSourceId from FF positionList
posIterator = getVertexValue().getFFList().iterator();
while(posIterator.hasNext()){
- pos = posIterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
+ kmer = posIterator.next();
+ if(kmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 05a4700..94428ca 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -7,13 +7,14 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -55,9 +56,9 @@
private float probBeingRandomHead = -1;
private Random randGenerator;
- private PositionWritable curID = new PositionWritable();
- private PositionWritable nextID = new PositionWritable();
- private PositionWritable prevID = new PositionWritable();
+ private KmerBytesWritable curKmer = new KmerBytesWritable();
+ private KmerBytesWritable nextKmer = new KmerBytesWritable();
+ private KmerBytesWritable prevKmer = new KmerBytesWritable();
private boolean hasNext;
private boolean hasPrev;
private boolean curHead;
@@ -73,6 +74,10 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
//if (randSeed < 0)
// randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
randSeed = getSuperstep();
@@ -91,43 +96,45 @@
outgoingMsg.reset();
}
- protected boolean isNodeRandomHead(PositionWritable nodeID) {
+ protected boolean isNodeRandomHead(KmerBytesWritable nodeKmer) {
// "deterministically random", based on node id
//randGenerator.setSeed(randSeed);
//randSeed = randGenerator.nextInt();
- randGenerator.setSeed((randSeed ^ nodeID.hashCode()) * 100000 * getSuperstep());//randSeed + nodeID.hashCode()
+ randGenerator.setSeed((randSeed ^ nodeKmer.hashCode()) * 100000 * getSuperstep());//randSeed + nodeID.hashCode()
+ for(int i = 0; i < 500; i++)
+ randGenerator.nextFloat();
return randGenerator.nextFloat() < probBeingRandomHead;
}
/**
- * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
+ * set nextKmer to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
*/
protected boolean setNextInfo(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0) {
- nextID.set(value.getFFList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
+ nextKmer.set(value.getFFList().getPosition(0));
+ nextHead = isNodeRandomHead(nextKmer);
return true;
}
if (value.getFRList().getCountOfPosition() > 0) {
- nextID.set(value.getFRList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
+ nextKmer.set(value.getFRList().getPosition(0));
+ nextHead = isNodeRandomHead(nextKmer);
return true;
}
return false;
}
/**
- * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
+ * set prevKmer to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
*/
protected boolean setPrevInfo(VertexValueWritable value) {
if (value.getRRList().getCountOfPosition() > 0) {
- prevID.set(value.getRRList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
+ prevKmer.set(value.getRRList().getPosition(0));
+ prevHead = isNodeRandomHead(prevKmer);
return true;
}
if (value.getRFList().getCountOfPosition() > 0) {
- prevID.set(value.getRFList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
+ prevKmer.set(value.getRFList().getPosition(0));
+ prevHead = isNodeRandomHead(prevKmer);
return true;
}
return false;
@@ -149,9 +156,9 @@
setStateAsNoMerge();
// only PATH vertices are present. Find the ID's for my neighbors
- curID.set(getVertexId());
+ curKmer.set(getVertexId());
- curHead = isNodeRandomHead(curID);
+ curHead = isNodeRandomHead(curKmer);
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
@@ -162,34 +169,35 @@
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
- sendUpdateMsgToPredecessor(); //TODO up -> update From -> to
+ sendUpdateMsgToPredecessor();
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
sendUpdateMsgToSuccessor();
}
}
- }else {
+ else {
// I'm a tail
if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
sendUpdateMsgToPredecessor();
}
} else if (!hasPrev) {
// no previous node
- if (!nextHead && curID.compareTo(nextID) < 0) {
+ if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
// merge towards tail in forward dir
sendUpdateMsgToPredecessor();
}
} else if (!hasNext) {
// no next node
- if (!prevHead && curID.compareTo(prevID) < 0) {
+ if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
// merge towards tail in reverse dir
sendUpdateMsgToSuccessor();
}
}
}
+ }
}
else if (getSuperstep() % 4 == 0){
//update neighber
@@ -222,8 +230,8 @@
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index b8d4098..519ffb9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -10,8 +10,8 @@
import edu.uci.ics.genomix.oldtype.PositionListWritable;
import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
-import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
@@ -103,8 +103,8 @@
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(DataCleanInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index cecaa48..7141ae7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -5,8 +5,8 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
-import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicPathMergeVertex;
@@ -105,8 +105,8 @@
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(DataCleanInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index e151123..bc08600 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -12,7 +12,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -28,16 +27,14 @@
File srcPath = new File(strSrcDir);
for (File f : srcPath.listFiles((FilenameFilter) (new WildcardFileFilter("part*")))) {
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(f.getAbsolutePath()), conf);
- //NodeWritable key = new NodeWritable(kmerSize);
- //NullWritable value = NullWritable.get();
- PositionWritable key = new PositionWritable();
+ KmerBytesWritable key = new KmerBytesWritable();
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
if (key == null) {
break;
}
- bw.write(key.toString() + value.toString());
+ bw.write(key.toString() + "\t" + value.toString());
System.out.println(key.toString());
bw.newLine();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 7b0dfec..b7f6a4f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -117,7 +117,7 @@
/**
* get nodeId from Ad
*/
- public static PositionWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
+ public static KmerBytesWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
if(adj.getForwardList().getCountOfPosition() > 0)
return adj.getForwardList().getPosition(0);
else if(adj.getReverseList().getCountOfPosition() > 0)
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 488e23d..0e798fe 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -4,13 +4,9 @@
import java.io.FileOutputStream;
import java.io.IOException;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
-import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -22,73 +18,74 @@
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
public class JobGenerator {
public static String outputBase = "src/test/resources/jobs/";
- private static void generateNaiveAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); //DataCleanInputFormat
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
+// private static void generateNaiveAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); //GraphCleanInputFormat
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genNaiveAlgorithmForMergeGraph() throws IOException {
+// generateNaiveAlgorithmForMergeGraphJob("NaiveAlgorithmForMergeGraph", outputBase
+// + "NaiveAlgorithmForMergeGraph.xml");
+// }
- private static void genNaiveAlgorithmForMergeGraph() throws IOException {
- generateNaiveAlgorithmForMergeGraphJob("NaiveAlgorithmForMergeGraph", outputBase
- + "NaiveAlgorithmForMergeGraph.xml");
- }
-
- private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genLogAlgorithmForMergeGraph() throws IOException {
- generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
- }
-
- private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(P3ForPathMergeVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 3);
- job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.3f);
- job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, 2);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genP3ForMergeGraph() throws IOException {
- generateP3ForMergeGraphJob("P3ForMergeGraph", outputBase
- + "P3ForMergeGraph.xml");
- }
+// private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+// job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genLogAlgorithmForMergeGraph() throws IOException {
+// generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+// }
+//
+// private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(P3ForPathMergeVertex.class);
+// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.3f);
+// job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, 2);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genP3ForMergeGraph() throws IOException {
+// generateP3ForMergeGraphJob("P3ForMergeGraph", outputBase
+// + "P3ForMergeGraph.xml");
+// }
private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(P4ForPathMergeVertex.class);
- job.setVertexInputFormatClass(DataCleanInputFormat.class); //NaiveAlgorithmForPathMergeInputFormat //
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -97,118 +94,118 @@
+ "P4ForMergeGraph.xml");
}
- private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(TipAddVertex.class);
- job.setVertexInputFormatClass(DataCleanOutputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genTipAddGraph() throws IOException {
- generateTipAddGraphJob("TipAddGraph", outputBase
- + "TipAddGraph.xml");
- }
-
- private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(TipRemoveVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genTipRemoveGraph() throws IOException {
- generateTipRemoveGraphJob("TipRemoveGraph", outputBase
- + "TipRemoveGraph.xml");
- }
-
- private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(BridgeAddVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genBridgeAddGraph() throws IOException {
- generateBridgeAddGraphJob("BridgeAddGraph", outputBase
- + "BridgeAddGraph.xml");
- }
-
- private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(BridgeRemoveVertex.class);
- job.setVertexInputFormatClass(DataCleanInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genBridgeRemoveGraph() throws IOException {
- generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
- + "BridgeRemoveGraph.xml");
- }
-
- private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(BubbleAddVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genBubbleAddGraph() throws IOException {
- generateBubbleAddGraphJob("BubbleAddGraph", outputBase
- + "BubbleAddGraph.xml");
- }
-
- private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(BubbleMergeVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(BubbleMergeVertex.KMER_SIZE, 5);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genBubbleMergeGraph() throws IOException {
- generateBubbleMergeGraphJob("BubbleMergeGraph", outputBase
- + "BubbleMergeGraph.xml");
- }
+// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(TipAddVertex.class);
+// job.setVertexInputFormatClass(GraphCleanOutputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genTipAddGraph() throws IOException {
+// generateTipAddGraphJob("TipAddGraph", outputBase
+// + "TipAddGraph.xml");
+// }
+//
+// private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(TipRemoveVertex.class);
+// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genTipRemoveGraph() throws IOException {
+// generateTipRemoveGraphJob("TipRemoveGraph", outputBase
+// + "TipRemoveGraph.xml");
+// }
+//
+// private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(BridgeAddVertex.class);
+// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genBridgeAddGraph() throws IOException {
+// generateBridgeAddGraphJob("BridgeAddGraph", outputBase
+// + "BridgeAddGraph.xml");
+// }
+//
+// private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(BridgeRemoveVertex.class);
+// job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genBridgeRemoveGraph() throws IOException {
+// generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
+// + "BridgeRemoveGraph.xml");
+// }
+//
+// private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(BubbleAddVertex.class);
+// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genBubbleAddGraph() throws IOException {
+// generateBubbleAddGraphJob("BubbleAddGraph", outputBase
+// + "BubbleAddGraph.xml");
+// }
+//
+// private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(BubbleMergeVertex.class);
+// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(PositionWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(BubbleMergeVertex.KMER_SIZE, 5);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genBubbleMergeGraph() throws IOException {
+// generateBubbleMergeGraphJob("BubbleMergeGraph", outputBase
+// + "BubbleMergeGraph.xml");
+// }
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
//genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
- genTipRemoveGraph();
- genBridgeAddGraph();
- genBridgeRemoveGraph();
- genBubbleAddGraph();
- genBubbleMergeGraph();
+// genTipRemoveGraph();
+// genBridgeAddGraph();
+// genBridgeRemoveGraph();
+// genBubbleAddGraph();
+// genBubbleMergeGraph();
genP4ForMergeGraph();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 79a13b3..5aedeb7 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -43,22 +43,16 @@
public class PathMergeSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(PathMergeSmallTestSuite.class.getName());
- public static final String PreFix = "data/actual"; //"graphbuildresult";
+ public static final String PreFix = "data/PathMergeTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "bubblemerge/BubbleMergeGraph/bin/small_bubble"};
- //+ "tipremove/TipRemoveGraph/bin/fr_with_tip"};
- //+ "graphs/pathmerge/singleread"};
- //+ "bridgeadd/BridgeAddGraph/bin/tworeads"};
- /*+ "2", PreFix + File.separator
- + "3", PreFix + File.separator
- + "4", PreFix + File.separator
- + "5", PreFix + File.separator
- + "6", PreFix + File.separator
- + "7", PreFix + File.separator
- + "8", PreFix + File.separator
- + "9", PreFix + File.separator
- + "tworeads3", PreFix + File.separator
- + "tworeads_6"};*/
+// + "2", PreFix + File.separator
+// + "3", PreFix + File.separator
+// + "4", PreFix + File.separator
+// + "5", PreFix + File.separator
+// + "6", PreFix + File.separator
+// + "7", PreFix + File.separator
+// + "8", PreFix + File.separator
+ + "4"};
private static final String ACTUAL_RESULT_DIR = "data/actual/pathmerge";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/BridgeAddGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/BridgeAddGraph.xml
deleted file mode 100644
index 4739d59..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/BridgeAddGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>BridgeRemoveVertex.kmerSize</name><value>3</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>BridgeAddGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/BridgeRemoveGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/BridgeRemoveGraph.xml
deleted file mode 100644
index f9b801a..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/BridgeRemoveGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>BridgeRemoveGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>BasicPathMergeVertex.kmerSize</name><value>5</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/BubbleAddGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/BubbleAddGraph.xml
deleted file mode 100644
index 452834a..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/BubbleAddGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>BubbleAddGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>BubbleAddVertex.kmerSize</name><value>3</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/BubbleMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/BubbleMergeGraph.xml
deleted file mode 100644
index 605a004..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/BubbleMergeGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>BubbleMergeGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>BubbleMergeVertex.kmerSize</name><value>5</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
index 5ef1509..597e5c3 100644
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
+++ b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
@@ -58,7 +58,7 @@
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
+<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.KmerBytesWritable</value></property>
<property><name>tasktracker.http.threads</name><value>40</value></property>
<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
@@ -96,7 +96,7 @@
<property><name>io.sort.mb</name><value>100</value></property>
<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>BasicPathMergeVertex.kmerSize</name><value>5</value></property>
+<property><name>BasicPathMergeVertex.kmerSize</name><value>3</value></property>
<property><name>mapred.task.profile</name><value>false</value></property>
<property><name>job.end.retry.interval</name><value>30000</value></property>
<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
@@ -117,12 +117,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/TipAddGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/TipAddGraph.xml
deleted file mode 100644
index 00c857e..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/TipAddGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>TipAddVertex.kmerSize</name><value>3</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>TipAddGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/TipRemoveGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/TipRemoveGraph.xml
deleted file mode 100644
index 588cc9f..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/TipRemoveGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>TipRemoveGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>BasicPathMergeVertex.kmerSize</name><value>5</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file