Merge branch 'nanzhang/hyracks_genomix' into genomix/fullstack_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
index 16df821..c287c1b 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
@@ -31,8 +31,7 @@
* @param start
*/
public VKmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
- kmer.reset(k);
- kmer.setByRead(array, start);
+ kmer.setByRead(k, array, start);
return kmer;
}
@@ -44,8 +43,7 @@
* @param start
*/
public VKmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
- kmer.reset(k);
- kmer.setByReadReverse(array, start);
+ kmer.setByReadReverse(k, array, start);
return kmer;
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
deleted file mode 100644
index 5b10fdc..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
+++ /dev/null
@@ -1,265 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.Writable;
-
-import edu.uci.ics.genomix.data.Marshal;
-
-
-public class KmerListWritable extends BinaryComparable
- implements Writable, Iterable<KmerBytesWritable>, Serializable{
- private static final long serialVersionUID = 1L;
- protected static final byte[] EMPTY_BYTES = { 0, 0, 0, 0 };
- protected static final int HEADER_SIZE = 4;
-
- protected byte[] storage;
- protected int offset;
- protected int valueCount;
- protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
-
- private KmerBytesWritable posIter = new KmerBytesWritable();
-
- public KmerListWritable() {
- storage = EMPTY_BYTES;
- valueCount = 0;
- offset = 0;
- storageMaxSize = storage.length;
- }
-
- public KmerListWritable(byte[] data, int offset) {
- setNewReference(data, offset);
- }
-
- public KmerListWritable(List<KmerBytesWritable> kmers) {
- this();
- setSize(kmers.size() * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE); // reserve space for all elements
- for (KmerBytesWritable kmer : kmers) {
- append(kmer);
- }
- }
-
- public void setNewReference(byte[] data, int offset) {
- valueCount = Marshal.getInt(data, offset);
- if (valueCount * KmerBytesWritable.getBytesPerKmer() > data.length - offset) {
- throw new IllegalArgumentException("Specified data buffer (len=" + (data.length - offset)
- + ") is not large enough to store requested number of elements (" + valueCount + ")!");
- }
- this.storage = data;
- this.offset = offset;
- this.storageMaxSize = valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
- }
-
- public void append(KmerBytesWritable kmer) {
- setSize((1 + valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- System.arraycopy(kmer.getBytes(), kmer.offset, storage,
- offset + HEADER_SIZE + valueCount * KmerBytesWritable.getBytesPerKmer(),
- KmerBytesWritable.getBytesPerKmer());
- valueCount += 1;
- Marshal.putInt(valueCount, storage, offset);
- }
-
- /*
- * Append the otherList to the end of myList
- */
- public void appendList(KmerListWritable otherList) {
- if (otherList.valueCount > 0) {
- setSize((valueCount + otherList.valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- // copy contents of otherList into the end of my storage
- System.arraycopy(otherList.storage, otherList.offset + HEADER_SIZE, storage, offset + HEADER_SIZE
- + valueCount * KmerBytesWritable.getBytesPerKmer(),
- otherList.valueCount * KmerBytesWritable.getBytesPerKmer());
- valueCount += otherList.valueCount;
- Marshal.putInt(valueCount, storage, offset);
- }
- }
-
- /**
- * Save the union of my list and otherList. Uses a temporary HashSet for
- * uniquefication
- */
- public void unionUpdate(KmerListWritable otherList) {
- int newSize = valueCount + otherList.valueCount;
- HashSet<KmerBytesWritable> uniqueElements = new HashSet<KmerBytesWritable>(newSize);
- for (KmerBytesWritable kmer : this) {
- uniqueElements.add(kmer);
- }
- for (KmerBytesWritable kmer : otherList) {
- uniqueElements.add(kmer);
- }
- valueCount = 0;
- setSize(newSize * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- for (KmerBytesWritable kmer : uniqueElements) { // this point is not efficient
- append(kmer);
- }
- Marshal.putInt(valueCount, storage, offset);
- }
-
- protected void setSize(int size) {
- if (size > getCapacity()) {
- setCapacity((size * 3 / 2));
- }
- }
-
- protected int getCapacity() {
- return storageMaxSize - offset;
- }
-
- protected void setCapacity(int new_cap) {
- if (new_cap > getCapacity()) {
- byte[] new_data = new byte[new_cap];
- if (valueCount > 0) {
- System.arraycopy(storage, offset, new_data, 0, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- }
- storage = new_data;
- offset = 0;
- storageMaxSize = storage.length;
- }
- }
-
- public void reset() {
- valueCount = 0;
- Marshal.putInt(valueCount, storage, offset);
- }
-
- public KmerBytesWritable getPosition(int i) {
- if (i >= valueCount) {
- throw new ArrayIndexOutOfBoundsException("No such positions");
- }
- posIter.setAsReference(storage, offset + HEADER_SIZE + i * KmerBytesWritable.getBytesPerKmer());
- return posIter;
- }
-
- public void setCopy(KmerListWritable otherList) {
- setCopy(otherList.storage, otherList.offset);
- }
-
- /**
- * read a KmerListWritable from newData, which should include the header
- */
- public void setCopy(byte[] newData, int offset) {
- int newValueCount = Marshal.getInt(newData, offset);
- setSize(newValueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- if (newValueCount > 0) {
- System.arraycopy(newData, offset + HEADER_SIZE, storage, this.offset + HEADER_SIZE, newValueCount
- * KmerBytesWritable.getBytesPerKmer());
- }
- valueCount = newValueCount;
- Marshal.putInt(valueCount, storage, this.offset);
- }
-
- @Override
- public Iterator<KmerBytesWritable> iterator() {
- Iterator<KmerBytesWritable> it = new Iterator<KmerBytesWritable>() {
-
- private int currentIndex = 0;
-
- @Override
- public boolean hasNext() {
- return currentIndex < valueCount;
- }
-
- @Override
- public KmerBytesWritable next() {
- return getPosition(currentIndex++);
- }
-
- @Override
- public void remove() {
- if (currentIndex < valueCount)
- System.arraycopy(storage, offset + currentIndex * KmerBytesWritable.getBytesPerKmer(), storage,
- offset + (currentIndex - 1) * KmerBytesWritable.getBytesPerKmer(),
- (valueCount - currentIndex) * KmerBytesWritable.getBytesPerKmer());
- valueCount--;
- currentIndex--;
- Marshal.putInt(valueCount, storage, offset);
- }
- };
- return it;
- }
-
- /*
- * remove the first instance of `toRemove`. Uses a linear scan. Throws an
- * exception if not in this list.
- */
- public void remove(KmerBytesWritable toRemove, boolean ignoreMissing) {
- Iterator<KmerBytesWritable> posIterator = this.iterator();
- while (posIterator.hasNext()) {
- if (toRemove.equals(posIterator.next())) {
- posIterator.remove();
- return; // break as soon as the element is found
- }
- }
- // element was not found
- if (!ignoreMissing) {
- throw new ArrayIndexOutOfBoundsException("the KmerBytesWritable `" + toRemove.toString()
- + "` was not found in this list.");
- }
- }
-
- public void remove(KmerBytesWritable toRemove) {
- remove(toRemove, false);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- valueCount = in.readInt();
- setSize(valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- in.readFully(storage, offset + HEADER_SIZE, valueCount * KmerBytesWritable.getBytesPerKmer() - HEADER_SIZE);
- Marshal.putInt(valueCount, storage, offset);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.write(storage, offset, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
- }
-
- public int getCountOfPosition() {
- return valueCount;
- }
-
- public byte[] getByteArray() {
- return storage;
- }
-
- public int getStartOffset() {
- return offset;
- }
-
- public int getLength() {
- return valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
- }
- @Override
- public String toString() {
- StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('[');
- for (int i = 0; i < valueCount; i++) {
- sbuilder.append(getPosition(i).toString());
- sbuilder.append(',');
- }
- if (valueCount > 0) {
- sbuilder.setCharAt(sbuilder.length() - 1, ']');
- } else {
- sbuilder.append(']');
- }
- return sbuilder.toString();
- }
-
- @Override
- public int hashCode() {
- return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
- }
-
- @Override
- public byte[] getBytes() {
-
- return null;
- }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 5eeea48..807dc7f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -8,7 +8,6 @@
import java.io.Serializable;
import java.util.Comparator;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.data.Marshal;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 881cbd6..e449760 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -243,9 +243,10 @@
@Override
public void readFields(DataInput in) throws IOException {
- this.valueCount = in.readInt();
- setSize(valueCount * PositionWritable.LENGTH + HEADER_SIZE);
- in.readFully(storage, offset + HEADER_SIZE, valueCount * PositionWritable.LENGTH);
+ int newValueCount = in.readInt();
+ setSize(newValueCount * PositionWritable.LENGTH + HEADER_SIZE);
+ in.readFully(storage, offset + HEADER_SIZE, newValueCount * PositionWritable.LENGTH);
+ valueCount = newValueCount;
Marshal.putInt(valueCount, storage, offset);
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index 7e516fd..71efb5f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -363,6 +363,7 @@
}
in.readFully(bytes, kmerStartOffset, bytesUsed);
}
+ saveHeader(lettersInKmer);
}
/**
@@ -441,7 +442,7 @@
* @param stringBytes
* @param start
*/
- public void setByRead(byte[] stringBytes, int start) {
+ private void setByRead(byte[] stringBytes, int start) {
byte l = 0;
int bytecount = 0;
int bcount = this.bytesUsed - 1;
@@ -474,7 +475,7 @@
* @param start
* position
*/
- public void setByReadReverse(byte[] array, int start) {
+ private void setByReadReverse(byte[] array, int start) {
byte l = 0;
int bytecount = 0;
int bcount = bytesUsed - 1;
@@ -505,6 +506,9 @@
* : the next kmer
*/
public void mergeWithFFKmer(int initialKmerSize, VKmerBytesWritable kmer) {
+ if (lettersInKmer < initialKmerSize - 1 || kmer.lettersInKmer < initialKmerSize - 1) {
+ throw new IllegalArgumentException("Not enough letters in the kmers to perform a merge! Tried K=" + initialKmerSize + ", merge '" + this + "' with '" + kmer + "'.");
+ }
int preKmerLength = lettersInKmer;
int preSize = bytesUsed;
lettersInKmer += kmer.lettersInKmer - initialKmerSize + 1;
@@ -538,6 +542,9 @@
* : the next kmer
*/
public void mergeWithFRKmer(int initialKmerSize, VKmerBytesWritable kmer) {
+ if (lettersInKmer < initialKmerSize - 1 || kmer.lettersInKmer < initialKmerSize - 1) {
+ throw new IllegalArgumentException("Not enough letters in the kmers to perform a merge! Tried K=" + initialKmerSize + ", merge '" + this + "' with '" + kmer + "'.");
+ }
int preSize = bytesUsed;
int preKmerLength = lettersInKmer;
lettersInKmer += kmer.lettersInKmer - initialKmerSize + 1;
@@ -605,6 +612,9 @@
* : the previous kmer
*/
public void mergeWithRRKmer(int initialKmerSize, VKmerBytesWritable preKmer) {
+ if (lettersInKmer < initialKmerSize - 1 || preKmer.lettersInKmer < initialKmerSize - 1) {
+ throw new IllegalArgumentException("Not enough letters in the kmers to perform a merge! Tried K=" + initialKmerSize + ", merge '" + this + "' with '" + preKmer + "'.");
+ }
int preKmerLength = lettersInKmer;
int preSize = bytesUsed;
lettersInKmer += preKmer.lettersInKmer - initialKmerSize + 1;
@@ -665,5 +675,37 @@
}
return new KmerBytesWritable(bytes, kmerStartOffset);
}
+
+ public static int editDistance(VKmerBytesWritable kmer1, VKmerBytesWritable kmer2) {
+ int rows = kmer1.getKmerLetterLength() + 1, columns = kmer2.getKmerLetterLength() + 1, r=0, c=0, match=0;
+ int[][] distMat = new int[rows][columns];
+
+ // initialize top row and left column
+ for (r = 0; r < rows; r++) {
+ distMat[r][0] = r;
+ }
+ for (c = 0; c < columns; c++) {
+ distMat[0][c] = c;
+ }
+
+ // fill out the matrix as the min of left+1, up+1, and diag+nomatch
+ for (r = 1; r < rows; r++) {
+ for (c = 1; c < columns; c++) {
+ match = kmer1.getGeneCodeAtPosition(r-1) == kmer2.getGeneCodeAtPosition(c-1) ? 0 : 1;
+ distMat[r][c] = min(distMat[r-1][c] + 1,
+ distMat[r][c-1] + 1,
+ distMat[r-1][c-1] + match);
+ }
+ }
+ return distMat[rows - 1][columns - 1];
+ }
+
+ private static int min(int a, int b, int c) {
+ return a <= b ? (a <= c ? a : c) : (b <= c ? b : c);
+ }
+
+ public int editDistance(VKmerBytesWritable other) {
+ return editDistance(this, other);
+ }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
index f269b5a..80353c1 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
@@ -234,7 +234,7 @@
* remove the first instance of `toRemove`. Uses a linear scan. Throws an
* exception if not in this list.
*/
- public void remove(KmerBytesWritable toRemove, boolean ignoreMissing) {
+ public void remove(VKmerBytesWritable toRemove, boolean ignoreMissing) {
Iterator<VKmerBytesWritable> posIterator = this.iterator();
while (posIterator.hasNext()) {
if (toRemove.equals(posIterator.next())) {
@@ -249,20 +249,19 @@
}
}
- public void remove(KmerBytesWritable toRemove) {
+ public void remove(VKmerBytesWritable toRemove) {
remove(toRemove, false);
}
@Override
public void readFields(DataInput in) throws IOException {
reset();
- valueCount = in.readInt();
- Marshal.putInt(valueCount, storage, offset);
+ int newValueCount = in.readInt();
int curOffset = offset + HEADER_SIZE;
int elemBytes = 0;
int elemLetters = 0;
int curLength = getLength();
- for (int i = 0; i < valueCount; i++) {
+ for (int i = 0; i < newValueCount; i++) {
elemLetters = in.readInt();
elemBytes = KmerUtil.getByteNumFromK(elemLetters) + VKmerBytesWritable.HEADER_SIZE;
setSize(curLength + elemBytes); // make sure we have room for the new element
@@ -271,7 +270,10 @@
- VKmerBytesWritable.HEADER_SIZE); // write kmer
curOffset += elemBytes;
curLength += elemBytes;
+ valueCount++;
}
+ valueCount = newValueCount;
+ Marshal.putInt(valueCount, storage, offset);
}
@Override
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
index 7808719..36115d3 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
@@ -29,8 +29,8 @@
@Test
public void TestGetLastKmer() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(9);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(9, array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
VKmerBytesWritable lastKmer;
for (int i = 8; i > 0; i--) {
@@ -50,8 +50,8 @@
@Test
public void TestGetFirstKmer() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(9);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(9, array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
VKmerBytesWritable firstKmer;
for (int i = 8; i > 0; i--) {
@@ -71,8 +71,8 @@
@Test
public void TestGetSubKmer() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(9);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(9, array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
VKmerBytesWritable subKmer;
for (int istart = 0; istart < kmer.getKmerLetterLength() - 1; istart++) {
@@ -85,8 +85,8 @@
@Test
public void TestMergeNext() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(9);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(9, array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
String text = "AGCTGACCG";
@@ -106,8 +106,8 @@
@Test
public void TestMergePre() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(9);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(9, array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
String text = "AGCTGACCG";
for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
@@ -126,11 +126,11 @@
@Test
public void TestMergeTwoKmer() {
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(9);
- kmer1.setByRead(array, 0);
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
+ kmer1.setByRead(9, array, 0);
String text1 = "AGCTGACCG";
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(9);
- kmer2.setByRead(array, 1);
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
+ kmer2.setByRead(9, array, 1);
String text2 = "GCTGACCGT";
Assert.assertEquals(text1, kmer1.toString());
Assert.assertEquals(text2, kmer2.toString());
@@ -138,8 +138,8 @@
VKmerBytesWritable merged = kmerFactory.mergeTwoKmer(kmer1, kmer2);
Assert.assertEquals(text1 + text2, merged.toString());
- VKmerBytesWritable kmer3 = new VKmerBytesWritable(3);
- kmer3.setByRead(array, 1);
+ VKmerBytesWritable kmer3 = new VKmerBytesWritable();
+ kmer3.setByRead(3, array, 1);
String text3 = "GCT";
Assert.assertEquals(text3, kmer3.toString());
@@ -148,17 +148,17 @@
merged = kmerFactory.mergeTwoKmer(kmer3, kmer1);
Assert.assertEquals(text3 + text1, merged.toString());
- VKmerBytesWritable kmer4 = new VKmerBytesWritable(8);
- kmer4.setByRead(array, 0);
+ VKmerBytesWritable kmer4 = new VKmerBytesWritable();
+ kmer4.setByRead(8, array, 0);
String text4 = "AGCTGACC";
Assert.assertEquals(text4, kmer4.toString());
merged = kmerFactory.mergeTwoKmer(kmer4, kmer3);
Assert.assertEquals(text4 + text3, merged.toString());
- VKmerBytesWritable kmer5 = new VKmerBytesWritable(7);
- kmer5.setByRead(array, 0);
+ VKmerBytesWritable kmer5 = new VKmerBytesWritable();
+ kmer5.setByRead(7, array, 0);
String text5 = "AGCTGAC";
- VKmerBytesWritable kmer6 = new VKmerBytesWritable(9);
+ VKmerBytesWritable kmer6 = new VKmerBytesWritable();
kmer6.setByRead(9, array, 1);
String text6 = "GCTGACCGT";
merged = kmerFactory.mergeTwoKmer(kmer5, kmer6);
@@ -192,17 +192,16 @@
@Test
public void TestReverseKmer() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(7);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(7, array, 0);
Assert.assertEquals(kmer.toString(), "AGCTGAC");
VKmerBytesWritable reversed = kmerFactory.reverse(kmer);
Assert.assertEquals(reversed.toString(), "CAGTCGA");
- kmer.reset(8);
- kmer.setByRead(("AATAGAAC").getBytes(), 0);
+ kmer.setByRead(8, ("AATAGAAC").getBytes(), 0);
Assert.assertEquals(kmer.toString(), "AATAGAAC");
reversed.reset(8);
reversed = kmerFactory.reverse(kmer);
- Assert.assertEquals(reversed.toString(), "GTTCTATT");
+ Assert.assertEquals(reversed.toString(), "CAAGATAA");
}
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index efeff71..4f7b90e 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -15,18 +15,12 @@
package edu.uci.ics.genomix.data.test;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
import junit.framework.Assert;
import org.junit.Test;
import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
public class KmerBytesWritableTest {
static byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
@@ -124,313 +118,5 @@
}
Assert.assertEquals(kmer.toString(), kmerAppend.toString());
}
- }
-
-
- @Test
- public void TestMergeFFKmer() {
- byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
- String text = "AGCTGACCGT";
- KmerBytesWritable kmer1 = new KmerBytesWritable(8);
- kmer1.setByRead(array, 0);
- String text1 = "AGCTGACC";
- KmerBytesWritable kmer2 = new KmerBytesWritable(8);
- kmer2.setByRead(array, 1);
- String text2 = "GCTGACCG";
- Assert.assertEquals(text2, kmer2.toString());
- KmerBytesWritable merge = new KmerBytesWritable(kmer1);
- int kmerSize = 8;
- merge.mergeWithFFKmer(kmerSize, kmer2);
- Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
-
- for (int i = 1; i < 8; i++) {
- merge.set(kmer1);
- merge.mergeWithFFKmer(i, kmer2);
- Assert.assertEquals(text1 + text2.substring(i - 1), merge.toString());
- }
-
- for (int ik = 1; ik <= 10; ik++) {
- for (int jk = 1; jk <= 10; jk++) {
- kmer1 = new KmerBytesWritable(ik);
- kmer2 = new KmerBytesWritable(jk);
- kmer1.setByRead(array, 0);
- kmer2.setByRead(array, 0);
- text1 = text.substring(0, ik);
- text2 = text.substring(0, jk);
- Assert.assertEquals(text1, kmer1.toString());
- Assert.assertEquals(text2, kmer2.toString());
- for (int x = 1; x < jk; x++) {
- merge.set(kmer1);
- merge.mergeWithFFKmer(x, kmer2);
- Assert.assertEquals(text1 + text2.substring(x - 1), merge.toString());
- }
- }
- }
- }
-
- @Test
- public void TestMergeFRKmer() {
- int kmerSize = 3;
- String result = "AAGCTAACAACC";
- byte[] resultArray = result.getBytes();
-
- String text1 = "AAGCTAA";
- KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
- kmer1.setByRead(resultArray, 0);
- Assert.assertEquals(text1, kmer1.toString());
-
- // kmer2 is the rc of the end of the read
- String text2 = "GGTTGTT";
- KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
- kmer2.setByReadReverse(resultArray, result.length() - text2.length());
- Assert.assertEquals(text2, kmer2.toString());
-
- KmerBytesWritable merge = new KmerBytesWritable(kmer1);
- merge.mergeWithFRKmer(kmerSize, kmer2);
- Assert.assertEquals(result, merge.toString());
-
- int i = 1;
- merge.set(kmer1);
- merge.mergeWithFRKmer(i, kmer2);
- Assert.assertEquals("AAGCTAAAACAACC", merge.toString());
-
- i = 2;
- merge.set(kmer1);
- merge.mergeWithFRKmer(i, kmer2);
- Assert.assertEquals("AAGCTAAACAACC", merge.toString());
-
- i = 3;
- merge.set(kmer1);
- merge.mergeWithFRKmer(i, kmer2);
- Assert.assertEquals("AAGCTAACAACC", merge.toString());
- }
-
-
- @Test
- public void TestMergeRFKmer() {
- int kmerSize = 3;
- String result = "GGCACAACAACCC";
- byte[] resultArray = result.getBytes();
-
- String text1 = "AACAACCC";
- KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
- kmer1.setByRead(resultArray, 5);
- Assert.assertEquals(text1, kmer1.toString());
-
- // kmer2 is the rc of the end of the read
- String text2 = "TTGTGCC";
- KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
- kmer2.setByReadReverse(resultArray, 0);
- Assert.assertEquals(text2, kmer2.toString());
-
- KmerBytesWritable merge = new KmerBytesWritable(kmer1);
- merge.mergeWithRFKmer(kmerSize, kmer2);
- Assert.assertEquals(result, merge.toString());
-
- int i = 1;
- merge.set(kmer1);
- merge.mergeWithRFKmer(i, kmer2);
- Assert.assertEquals("GGCACAAAACAACCC", merge.toString());
-
- i = 2;
- merge.set(kmer1);
- merge.mergeWithRFKmer(i, kmer2);
- Assert.assertEquals("GGCACAAACAACCC", merge.toString());
-
- i = 3;
- merge.set(kmer1);
- merge.mergeWithRFKmer(i, kmer2);
- Assert.assertEquals("GGCACAACAACCC", merge.toString());
-
- String test1;
- String test2;
- test1 = "CTA";
- test2 = "AGA";
- KmerBytesWritable k1 = new KmerBytesWritable(3);
- KmerBytesWritable k2 = new KmerBytesWritable(3);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k1.mergeWithRFKmer(3, k2);
- Assert.assertEquals("TCTA", k1.toString());
-
- test1 = "CTA";
- test2 = "ATA"; //TAT
- k1 = new KmerBytesWritable(3);
- k2 = new KmerBytesWritable(3);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k1.mergeWithFRKmer(3, k2);
- Assert.assertEquals("CTAT", k1.toString());
-
- test1 = "ATA";
- test2 = "CTA"; //TAT
- k1 = new KmerBytesWritable(3);
- k2 = new KmerBytesWritable(3);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k1.mergeWithFRKmer(3, k2);
- Assert.assertEquals("ATAG", k1.toString());
-
- test1 = "TCTAT";
- test2 = "GAAC";
- k1 = new KmerBytesWritable(5);
- k2 = new KmerBytesWritable(4);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k1.mergeWithRFKmer(3, k2);
- Assert.assertEquals("GTTCTAT", k1.toString());
- }
-
-
-
- @Test
- public void TestMergeRRKmer() {
- byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
- String text = "AGCTGACCGT";
- KmerBytesWritable kmer1 = new KmerBytesWritable(8);
- kmer1.setByRead(array, 0);
- String text1 = "AGCTGACC";
- KmerBytesWritable kmer2 = new KmerBytesWritable(8);
- kmer2.setByRead(array, 1);
- String text2 = "GCTGACCG";
- Assert.assertEquals(text2, kmer2.toString());
- KmerBytesWritable merge = new KmerBytesWritable(kmer2);
- int kmerSize = 8;
- merge.mergeWithRRKmer(kmerSize, kmer1);
- Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
-
- for (int i = 1; i < 8; i++) {
- merge.set(kmer2);
- merge.mergeWithRRKmer(i, kmer1);
- Assert.assertEquals(text1.substring(0, text1.length() - i + 1) + text2, merge.toString());
- }
-
- for (int ik = 1; ik <= 10; ik++) {
- for (int jk = 1; jk <= 10; jk++) {
- kmer1 = new KmerBytesWritable(ik);
- kmer2 = new KmerBytesWritable(jk);
- kmer1.setByRead(array, 0);
- kmer2.setByRead(array, 0);
- text1 = text.substring(0, ik);
- text2 = text.substring(0, jk);
- Assert.assertEquals(text1, kmer1.toString());
- Assert.assertEquals(text2, kmer2.toString());
- for (int x = 1; x < ik; x++) {
- merge.set(kmer2);
- merge.mergeWithRRKmer(x, kmer1);
- Assert.assertEquals(text1.substring(0, text1.length() - x + 1) + text2, merge.toString());
- }
- }
- }
- }
-
- @Test
- public void TestFinalMerge() {
- String selfString;
- String match;
- String msgString;
- int index;
- KmerBytesWritable kmer = new KmerBytesWritable();
- int kmerSize = 3;
-
- String F1 = "AATAG";
- String F2 = "TAGAA";
- String R1 = "CTATT";
- String R2 = "TTCTA";
-
- //FF test
- selfString = F1;
- match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
- msgString = F2;
- index = msgString.indexOf(match);
- kmer.reset(msgString.length() - index);
- kmer.setByRead(msgString.substring(index).getBytes(), 0);
- System.out.println(kmer.toString());
-
- //FR test
- selfString = F1;
- match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
- msgString = GeneCode.reverseComplement(R2);
- index = msgString.indexOf(match);
- kmer.reset(msgString.length() - index);
- kmer.setByRead(msgString.substring(index).getBytes(), 0);
- System.out.println(kmer.toString());
-
- //RF test
- selfString = R1;
- match = selfString.substring(0,kmerSize - 1);
- msgString = GeneCode.reverseComplement(F2);
- index = msgString.lastIndexOf(match) + kmerSize - 2;
- kmer.reset(index + 1);
- kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
- System.out.println(kmer.toString());
-
- //RR test
- selfString = R1;
- match = selfString.substring(0,kmerSize - 1);
- msgString = R2;
- index = msgString.lastIndexOf(match) + kmerSize - 2;
- kmer.reset(index + 1);
- kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
- System.out.println(kmer.toString());
-
- String[][] connectedTable = new String[][]{
- {"FF", "RF"},
- {"FF", "RR"},
- {"FR", "RF"},
- {"FR", "RR"}
- };
- System.out.println(connectedTable[0][1]);
-
- Set<Long> s1 = new HashSet<Long>();
- Set<Long> s2 = new HashSet<Long>();
- s1.add((long) 1);
- s1.add((long) 2);
- s2.add((long) 2);
- s2.add((long) 3);
- Set<Long> intersection = new HashSet<Long>();
- intersection.addAll(s1);
- intersection.retainAll(s2);
- System.out.println(intersection.toString());
- Set<Long> difference = new HashSet<Long>();
- difference.addAll(s1);
- difference.removeAll(s2);
- System.out.println(difference.toString());
-
- Map<KmerBytesWritable, Set<Long>> map = new HashMap<KmerBytesWritable, Set<Long>>();
- KmerBytesWritable k1 = new KmerBytesWritable(3);
- Set<Long> set1 = new HashSet<Long>();
- k1.setByRead(("CTA").getBytes(), 0);
- set1.add((long)1);
- map.put(k1, set1);
- KmerBytesWritable k2 = new KmerBytesWritable(3);
- k2.setByRead(("GTA").getBytes(), 0);
- Set<Long> set2 = new HashSet<Long>();
- set2.add((long) 2);
- map.put(k2, set2);
- KmerBytesWritable k3 = new KmerBytesWritable(3);
- k3.setByRead(("ATG").getBytes(), 0);
- Set<Long> set3 = new HashSet<Long>();
- set3.add((long) 2);
- map.put(k3, set3);
- KmerBytesWritable k4 = new KmerBytesWritable(3);
- k4.setByRead(("AAT").getBytes(), 0);
- Set<Long> set4 = new HashSet<Long>();
- set4.add((long) 1);
- map.put(k4, set4);
- KmerListWritable kmerList = new KmerListWritable(3);
- kmerList.append(k1);
- kmerList.append(k2);
- System.out.println("CTA = " + map.get(k1).toString());
- System.out.println("GTA = " + map.get(k2).toString());
- System.out.println("ATG = " + map.get(k3).toString());
- System.out.println("AAT = " + map.get(k4).toString());
- System.out.println(k1.compareTo(k2));
- System.out.println(k2.compareTo(k1));
-
- System.out.println("CTA = " + kmerList.getPosition(0).toString());
- System.out.println("GTA = " + kmerList.getPosition(1).toString());
- System.out.println("CTA = " + map.get(kmerList.getPosition(0)).toString());
- System.out.println("GTA = " + map.get(kmerList.getPosition(1)).toString());
- }
+ }
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
index 9ca4488..fc97664 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
@@ -23,7 +23,7 @@
kmer = new VKmerBytesWritable(i);
String randomString = generaterRandomString(i);
byte[] array = randomString.getBytes();
- kmer.setByRead(array, 0);
+ kmer.setByRead(i, array, 0);
kmerList.reset();
kmerList.append(kmer);
Assert.assertEquals(randomString, kmerList.getPosition(0).toString());
@@ -36,7 +36,7 @@
kmer = new VKmerBytesWritable(5);
String randomString = generaterRandomString(5);
byte[] array = randomString.getBytes();
- kmer.setByRead(array, 0);
+ kmer.setByRead(5, array, 0);
kmerList.append(kmer);
Assert.assertEquals(kmerList.getPosition(i).toString(), randomString);
Assert.assertEquals(i + 1, kmerList.getCountOfPosition());
@@ -62,7 +62,7 @@
kmer = new VKmerBytesWritable(5);
String randomString = generaterRandomString(5);
byte[] array = randomString.getBytes();
- kmer.setByRead(array, 0);
+ kmer.setByRead(5, array, 0);
kmerList.append(kmer);
Assert.assertEquals(randomString, kmerList.getPosition(i).toString());
Assert.assertEquals(i + 1, kmerList.getCountOfPosition());
@@ -78,7 +78,7 @@
iterator = copyList.iterator();
byte[] array = kmerList.getPosition(j).toString().getBytes();
VKmerBytesWritable deletePos = new VKmerBytesWritable(5);
- deletePos.setByRead(array, 0);
+ deletePos.setByRead(5, array, 0);
boolean removed = false;
while(iterator.hasNext()){
tmpKmer = iterator.next();
@@ -109,9 +109,9 @@
VKmerListWritable edgeList = new VKmerListWritable();
VKmerBytesWritable k = new VKmerBytesWritable(3);
- k.setByRead(("AAA").getBytes(), 0);
+ k.setByRead(3, ("AAA").getBytes(), 0);
edgeList.append(k);
- k.setByRead(("CCC").getBytes(), 0);
+ k.setByRead(3, ("CCC").getBytes(), 0);
edgeList.append(k);
for(VKmerBytesWritable edge : edgeList){
System.out.println(edge.toString());
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
index 5dd4f82..7460776 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/VKmerBytesWritableTest.java
@@ -15,6 +15,11 @@
package edu.uci.ics.genomix.data.test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import junit.framework.Assert;
import org.junit.Test;
@@ -22,6 +27,8 @@
import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
+
public class VKmerBytesWritableTest {
static byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
@@ -30,17 +37,17 @@
@Test
public void TestCompressKmer() {
VKmerBytesWritable kmer = new VKmerBytesWritable(k);
- kmer.setByRead(array, 0);
+ kmer.setByRead(k, array, 0);
Assert.assertEquals(kmer.toString(), "AATAGAA");
- kmer.setByRead(array, 1);
+ kmer.setByRead(k, array, 1);
Assert.assertEquals(kmer.toString(), "ATAGAAG");
}
@Test
public void TestMoveKmer() {
VKmerBytesWritable kmer = new VKmerBytesWritable(k);
- kmer.setByRead(array, 0);
+ kmer.setByRead(k, array, 0);
Assert.assertEquals(kmer.toString(), "AATAGAA");
for (int i = k; i < array.length - 1; i++) {
@@ -55,18 +62,18 @@
@Test
public void TestCompressKmerReverse() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(k);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(k, array, 0);
Assert.assertEquals(kmer.toString(), "AATAGAA");
- kmer.setByReadReverse(array, 1);
+ kmer.setByReadReverse(k, array, 1);
Assert.assertEquals(kmer.toString(), "CTTCTAT");
}
@Test
public void TestMoveKmerReverse() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(k);
- kmer.setByRead(array, 0);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmer.setByRead(k, array, 0);
Assert.assertEquals(kmer.toString(), "AATAGAA");
for (int i = k; i < array.length - 1; i++) {
@@ -81,10 +88,10 @@
@Test
public void TestGetGene() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(9);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
String text = "AGCTGACCG";
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G' };
- kmer.setByRead(array, 0);
+ kmer.setByRead(9, array, 0);
for (int i = 0; i < 9; i++) {
Assert.assertEquals(text.charAt(i), (char) (GeneCode.getSymbolFromCode(kmer.getGeneCodeAtPosition(i))));
@@ -96,9 +103,9 @@
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String string = "AGCTGACCGT";
for (int k = 3; k <= 10; k++) {
- VKmerBytesWritable kmer = new VKmerBytesWritable(k);
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
VKmerBytesWritable kmerAppend = new VKmerBytesWritable(k);
- kmer.setByRead(array, 0);
+ kmer.setByRead(k, array, 0);
Assert.assertEquals(string.substring(0, k), kmer.toString());
for (int b = 0; b < k; b++) {
byte byteActual = KmerBytesWritable.getOneByteFromKmerAtPosition(b, kmer.getBytes(),
@@ -119,13 +126,13 @@
public void TestMergeFFKmer() {
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String text = "AGCTGACCGT";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(8);
- kmer1.setByRead(array, 0);
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
+ kmer1.setByRead(8, array, 0);
String text1 = "AGCTGACC";
Assert.assertEquals(text1, kmer1.toString());
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(8);
- kmer2.setByRead(array, 1);
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
+ kmer2.setByRead(8, array, 1);
String text2 = "GCTGACCG";
Assert.assertEquals(text2, kmer2.toString());
@@ -144,13 +151,13 @@
for (int jk = 1; jk <= 10; jk++) {
kmer1 = new VKmerBytesWritable(ik);
kmer2 = new VKmerBytesWritable(jk);
- kmer1.setByRead(array, 0);
- kmer2.setByRead(array, 0);
+ kmer1.setByRead(ik, array, 0);
+ kmer2.setByRead(jk, array, 0);
text1 = text.substring(0, ik);
text2 = text.substring(0, jk);
Assert.assertEquals(text1, kmer1.toString());
Assert.assertEquals(text2, kmer2.toString());
- for (int x = 1; x < jk; x++) {
+ for (int x = 1; x < (jk < ik ? jk : ik); x++) {
merge.setAsCopy(kmer1);
merge.mergeWithFFKmer(x, kmer2);
Assert.assertEquals(text1 + text2.substring(x - 1), merge.toString());
@@ -166,17 +173,18 @@
byte[] resultArray = result.getBytes();
String text1 = "AAGCTAA";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(text1.length());
- kmer1.setByRead(resultArray, 0);
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
+ kmer1.setByRead(text1.length(), resultArray, 0);
Assert.assertEquals(text1, kmer1.toString());
// kmer2 is the rc of the end of the read
String text2 = "GGTTGTT";
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(text2.length());
- kmer2.setByReadReverse(resultArray, result.length() - text2.length());
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
+ kmer2.setByReadReverse(text2.length(), resultArray, result.length() - text2.length());
Assert.assertEquals(text2, kmer2.toString());
- VKmerBytesWritable merge = new VKmerBytesWritable(kmer1);
+ VKmerBytesWritable merge = new VKmerBytesWritable();
+ merge.setAsCopy(kmer1);
merge.mergeWithFRKmer(kmerSize, kmer2);
Assert.assertEquals(result, merge.toString());
@@ -203,17 +211,18 @@
byte[] resultArray = result.getBytes();
String text1 = "AACAACCC";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(text1.length());
- kmer1.setByRead(resultArray, 5);
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
+ kmer1.setByRead(text1.length(), resultArray, 5);
Assert.assertEquals(text1, kmer1.toString());
// kmer2 is the rc of the end of the read
String text2 = "TTGTGCC";
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(text2.length());
- kmer2.setByReadReverse(resultArray, 0);
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
+ kmer2.setByReadReverse(text2.length(), resultArray, 0);
Assert.assertEquals(text2, kmer2.toString());
- VKmerBytesWritable merge = new VKmerBytesWritable(kmer1);
+ VKmerBytesWritable merge = new VKmerBytesWritable();
+ merge.setAsCopy(kmer1);
merge.mergeWithRFKmer(kmerSize, kmer2);
Assert.assertEquals(result, merge.toString());
@@ -244,25 +253,65 @@
String test3 = "CTA";
String test4 = "AGA"; // rc = TCT
- VKmerBytesWritable k3 = new VKmerBytesWritable(3);
- VKmerBytesWritable k4 = new VKmerBytesWritable(3);
- k3.setByRead(test3.getBytes(), 0);
- k4.setByRead(test4.getBytes(), 0);
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ VKmerBytesWritable k4 = new VKmerBytesWritable();
+ k3.setByRead(3, test3.getBytes(), 0);
+ k4.setByRead(3, test4.getBytes(), 0);
k3.mergeWithRFKmer(3, k4);
Assert.assertEquals("TCTA", k3.toString());
// Assert.assertEquals("CTAT", k3); // this is an incorrect test case--
// the merge always flips the passed-in kmer
+
+ String test1;
+ String test2;
+ test1 = "CTA";
+ test2 = "AGA";
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ k1.setByRead(3, test1.getBytes(), 0);
+ k2.setByRead(3, test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("TCTA", k1.toString());
+
+
+
+ test1 = "CTA";
+ test2 = "ATA"; //TAT
+ k1 = new VKmerBytesWritable();
+ k2 = new VKmerBytesWritable();
+ k1.setByRead(3, test1.getBytes(), 0);
+ k2.setByRead(3, test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("CTAT", k1.toString());
+
+ test1 = "ATA";
+ test2 = "CTA"; //TAT
+ k1 = new VKmerBytesWritable();
+ k2 = new VKmerBytesWritable();
+ k1.setByRead(3, test1.getBytes(), 0);
+ k2.setByRead(3, test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("ATAG", k1.toString());
+
+ test1 = "TCTAT";
+ test2 = "GAAC";
+ k1 = new VKmerBytesWritable();
+ k2 = new VKmerBytesWritable();
+ k1.setByRead(5, test1.getBytes(), 0);
+ k2.setByRead(4, test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("GTTCTAT", k1.toString());
}
@Test
public void TestMergeRRKmer() {
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String text = "AGCTGACCGT";
- VKmerBytesWritable kmer1 = new VKmerBytesWritable(8);
- kmer1.setByRead(array, 0);
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable();
+ kmer1.setByRead(8, array, 0);
String text1 = "AGCTGACC";
- VKmerBytesWritable kmer2 = new VKmerBytesWritable(8);
- kmer2.setByRead(array, 1);
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable();
+ kmer2.setByRead(8, array, 1);
String text2 = "GCTGACCG";
Assert.assertEquals(text2, kmer2.toString());
VKmerBytesWritable merge = new VKmerBytesWritable(kmer2);
@@ -278,15 +327,15 @@
for (int ik = 1; ik <= 10; ik++) {
for (int jk = 1; jk <= 10; jk++) {
- kmer1 = new VKmerBytesWritable(ik);
- kmer2 = new VKmerBytesWritable(jk);
- kmer1.setByRead(array, 0);
- kmer2.setByRead(array, 0);
+ kmer1 = new VKmerBytesWritable();
+ kmer2 = new VKmerBytesWritable();
+ kmer1.setByRead(ik, array, 0);
+ kmer2.setByRead(jk, array, 0);
text1 = text.substring(0, ik);
text2 = text.substring(0, jk);
Assert.assertEquals(text1, kmer1.toString());
Assert.assertEquals(text2, kmer2.toString());
- for (int x = 1; x < ik; x++) {
+ for (int x = 1; x < (ik < jk ? ik : jk); x++) {
merge.setAsCopy(kmer2);
merge.mergeWithRRKmer(x, kmer1);
Assert.assertEquals(text1.substring(0, text1.length() - x + 1) + text2, merge.toString());
@@ -300,12 +349,12 @@
String test1 = "TAGAT";
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "GCTAG";
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k3.setByRead(test3.getBytes(), 0);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ k1.setByRead(5, test1.getBytes(), 0);
+ k2.setByRead(5, test2.getBytes(), 0);
+ k3.setByRead(5, test3.getBytes(), 0);
k1.mergeWithRFKmer(5, k2);
Assert.assertEquals("CTAGAT", k1.toString());
k1.mergeWithRRKmer(5, k3);
@@ -317,12 +366,12 @@
String test1 = "TAGAT";
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k3.setByRead(test3.getBytes(), 0);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ k1.setByRead(5, test1.getBytes(), 0);
+ k2.setByRead(5, test2.getBytes(), 0);
+ k3.setByRead(5, test3.getBytes(), 0);
k1.mergeWithRFKmer(5, k2);
Assert.assertEquals("CTAGAT", k1.toString());
k1.mergeWithRFKmer(5, k3);
@@ -334,12 +383,12 @@
String test1 = "TAGAT"; // rc = ATCTA
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "GCTAG"; // rc = CTAGC
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k3.setByRead(test3.getBytes(), 0);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ k1.setByRead(5, test1.getBytes(), 0);
+ k2.setByRead(5, test2.getBytes(), 0);
+ k3.setByRead(5, test3.getBytes(), 0);
k2.mergeWithRFKmer(5, k1);
Assert.assertEquals("ATCTAG", k2.toString());
k2.mergeWithFRKmer(5, k3);
@@ -351,12 +400,12 @@
String test1 = "TAGAT"; // rc = ATCTA
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k3.setByRead(test3.getBytes(), 0);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ k1.setByRead(5, test1.getBytes(), 0);
+ k2.setByRead(5, test2.getBytes(), 0);
+ k3.setByRead(5, test3.getBytes(), 0);
k2.mergeWithRFKmer(5, k1);
Assert.assertEquals("ATCTAG", k2.toString());
k2.mergeWithFFKmer(5, k3);
@@ -368,12 +417,12 @@
String test1 = "TAGAT"; // rc = ATCTA
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k3.setByRead(test3.getBytes(), 0);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ k1.setByRead(5, test1.getBytes(), 0);
+ k2.setByRead(5, test2.getBytes(), 0);
+ k3.setByRead(5, test3.getBytes(), 0);
k2.mergeWithRFKmer(5, k1);
Assert.assertEquals("ATCTAG", k2.toString());
k2.mergeWithFFKmer(5, k3);
@@ -385,15 +434,144 @@
String test1 = "TAGAT";
String test2 = "TCTAG"; // rc = CTAGA
String test3 = "CTAGC"; // rc = GCTAG
- VKmerBytesWritable k1 = new VKmerBytesWritable(5);
- VKmerBytesWritable k2 = new VKmerBytesWritable(5);
- VKmerBytesWritable k3 = new VKmerBytesWritable(5);
- k1.setByRead(test1.getBytes(), 0);
- k2.setByRead(test2.getBytes(), 0);
- k3.setByRead(test3.getBytes(), 0);
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ k1.setByRead(5, test1.getBytes(), 0);
+ k2.setByRead(5, test2.getBytes(), 0);
+ k3.setByRead(5, test3.getBytes(), 0);
k1.mergeWithRFKmer(5, k2);
Assert.assertEquals("CTAGAT", k1.toString());
k1.mergeWithRFKmer(5, k3);
Assert.assertEquals("GCTAGAT", k1.toString());
}
+
+ @Test
+ public void TestFinalMerge() {
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ int kmerSize = 3;
+
+ String F1 = "AATAG";
+ String F2 = "TAGAA";
+ String R1 = "CTATT";
+ String R2 = "TTCTA";
+
+ //FF test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = F2;
+ index = msgString.indexOf(match);
+ // does this test belong in VKmer so it can have variable-length kmers?
+// kmer.reset(msgString.length() - index);
+ kmer.setByRead(kmerSize, msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //FR test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(R2);
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(kmerSize, msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RF test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(F2);
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByReadReverse(kmerSize, msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RR test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = R2;
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByRead(kmerSize, msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ String[][] connectedTable = new String[][]{
+ {"FF", "RF"},
+ {"FF", "RR"},
+ {"FR", "RF"},
+ {"FR", "RR"}
+ };
+ System.out.println(connectedTable[0][1]);
+
+ Set<Long> s1 = new HashSet<Long>();
+ Set<Long> s2 = new HashSet<Long>();
+ s1.add((long) 1);
+ s1.add((long) 2);
+ s2.add((long) 2);
+ s2.add((long) 3);
+ Set<Long> intersection = new HashSet<Long>();
+ intersection.addAll(s1);
+ intersection.retainAll(s2);
+ System.out.println(intersection.toString());
+ Set<Long> difference = new HashSet<Long>();
+ difference.addAll(s1);
+ difference.removeAll(s2);
+ System.out.println(difference.toString());
+
+ Map<VKmerBytesWritable, Set<Long>> map = new HashMap<VKmerBytesWritable, Set<Long>>();
+ VKmerBytesWritable k1 = new VKmerBytesWritable();
+ Set<Long> set1 = new HashSet<Long>();
+ k1.setByRead(3, ("CTA").getBytes(), 0);
+ set1.add((long)1);
+ map.put(k1, set1);
+ VKmerBytesWritable k2 = new VKmerBytesWritable();
+ k2.setByRead(3, ("GTA").getBytes(), 0);
+ Set<Long> set2 = new HashSet<Long>();
+ set2.add((long) 2);
+ map.put(k2, set2);
+ VKmerBytesWritable k3 = new VKmerBytesWritable();
+ k3.setByRead(3, ("ATG").getBytes(), 0);
+ Set<Long> set3 = new HashSet<Long>();
+ set3.add((long) 2);
+ map.put(k3, set3);
+ VKmerBytesWritable k4 = new VKmerBytesWritable();
+ k4.setByRead(3, ("AAT").getBytes(), 0);
+ Set<Long> set4 = new HashSet<Long>();
+ set4.add((long) 1);
+ map.put(k4, set4);
+ VKmerListWritable kmerList = new VKmerListWritable();
+ kmerList.append(k1);
+ kmerList.append(k2);
+ System.out.println("CTA = " + map.get(k1).toString());
+ System.out.println("GTA = " + map.get(k2).toString());
+ System.out.println("ATG = " + map.get(k3).toString());
+ System.out.println("AAT = " + map.get(k4).toString());
+ System.out.println(k1.compareTo(k2));
+ System.out.println(k2.compareTo(k1));
+
+ System.out.println("CTA = " + kmerList.getPosition(0).toString());
+ System.out.println("GTA = " + kmerList.getPosition(1).toString());
+ System.out.println("CTA = " + map.get(kmerList.getPosition(0)).toString());
+ System.out.println("GTA = " + map.get(kmerList.getPosition(1)).toString());
+ }
+
+ @Test
+ public void TestEditDistance() {
+ VKmerBytesWritable kmer1 = new VKmerBytesWritable("ACGT");
+ VKmerBytesWritable kmer2 = new VKmerBytesWritable("AAAACGT");
+
+ Assert.assertEquals(kmer1.editDistance(kmer2), 3);
+ Assert.assertEquals(kmer1.editDistance(kmer2), kmer2.editDistance(kmer1));
+
+ kmer1.setAsCopy("");
+ Assert.assertEquals(kmer1.editDistance(kmer2), kmer2.getKmerLetterLength());
+ Assert.assertEquals(kmer1.editDistance(kmer2), kmer2.editDistance(kmer1));
+
+ kmer2.setAsCopy("");
+ Assert.assertEquals(kmer1.editDistance(kmer2), kmer2.getKmerLetterLength());
+ Assert.assertEquals(kmer1.editDistance(kmer2), kmer2.editDistance(kmer1));
+ }
+
}
diff --git a/genomix/genomix-hadoop/data/webmap/8 b/genomix/genomix-hadoop/data/webmap/8
deleted file mode 100644
index 3959d4d..0000000
--- a/genomix/genomix-hadoop/data/webmap/8
+++ /dev/null
@@ -1 +0,0 @@
-1 AATAGAACTT
diff --git a/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt b/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt
new file mode 100644
index 0000000..472a7dc
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CACGC
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
index 3723ed9..2553d16 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
@@ -14,8 +14,8 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
@SuppressWarnings("deprecation")
@@ -51,7 +51,7 @@
conf.setMapperClass(GenomixMapper.class);
conf.setReducerClass(GenomixReducer.class);
- conf.setMapOutputKeyClass(KmerBytesWritable.class);
+ conf.setMapOutputKeyClass(VKmerBytesWritable.class);
conf.setMapOutputValueClass(NodeWritable.class);
//InputFormat and OutputFormat for Reducer
@@ -62,7 +62,7 @@
conf.setOutputFormat(TextOutputFormat.class);
//Output Key/Value Class
- conf.setOutputKeyClass(KmerBytesWritable.class);
+ conf.setOutputKeyClass(VKmerBytesWritable.class);
conf.setOutputValueClass(NodeWritable.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
index 0c7cc20..6951e8b 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
@@ -12,16 +12,15 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
@SuppressWarnings("deprecation")
public class GenomixMapper extends MapReduceBase implements
- Mapper<LongWritable, Text, KmerBytesWritable, NodeWritable>{
+ Mapper<LongWritable, Text, VKmerBytesWritable, NodeWritable>{
public static enum KmerDir{
FORWARD,
@@ -29,12 +28,12 @@
}
public static int KMER_SIZE;
- private KmerBytesWritable preForwardKmer;
- private KmerBytesWritable preReverseKmer;
- private KmerBytesWritable curForwardKmer;
- private KmerBytesWritable curReverseKmer;
- private KmerBytesWritable nextForwardKmer;
- private KmerBytesWritable nextReverseKmer;
+ private VKmerBytesWritable preForwardKmer;
+ private VKmerBytesWritable preReverseKmer;
+ private VKmerBytesWritable curForwardKmer;
+ private VKmerBytesWritable curReverseKmer;
+ private VKmerBytesWritable nextForwardKmer;
+ private VKmerBytesWritable nextReverseKmer;
private PositionWritable nodeId;
private PositionListWritable nodeIdList;
private VKmerListWritable edgeListForPreKmer;
@@ -50,13 +49,12 @@
@Override
public void configure(JobConf job) {
KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
- KmerBytesWritable.setGlobalKmerLength(KMER_SIZE);
- preForwardKmer = new KmerBytesWritable();
- preReverseKmer = new KmerBytesWritable();
- curForwardKmer = new KmerBytesWritable();
- curReverseKmer = new KmerBytesWritable();
- nextForwardKmer = new KmerBytesWritable();
- nextReverseKmer = new KmerBytesWritable();
+ preForwardKmer = new VKmerBytesWritable();
+ preReverseKmer = new VKmerBytesWritable();
+ curForwardKmer = new VKmerBytesWritable();
+ curReverseKmer = new VKmerBytesWritable();
+ nextForwardKmer = new VKmerBytesWritable();
+ nextReverseKmer = new VKmerBytesWritable();
nodeId = new PositionWritable();
nodeIdList = new PositionListWritable();
edgeListForPreKmer = new VKmerListWritable();
@@ -68,9 +66,8 @@
}
@Override
- public void map(LongWritable key, Text value, OutputCollector<KmerBytesWritable, NodeWritable> output,
+ public void map(LongWritable key, Text value, OutputCollector<VKmerBytesWritable, NodeWritable> output,
Reporter reporter) throws IOException {
- /** first kmer */
String[] rawLine = value.toString().split("\\t"); // Read the Real Gene Line
if (rawLine.length != 2) {
throw new IOException("invalid data");
@@ -86,11 +83,10 @@
if (KMER_SIZE >= array.length) {
throw new IOException("short read");
}
-
/** first kmer **/
outputNode.reset();
- curForwardKmer.setByRead(array, 0);
- curReverseKmer.setByReadReverse(array, 0);
+ curForwardKmer.setByRead(KMER_SIZE, array, 0);
+ curReverseKmer.setByReadReverse(KMER_SIZE, array, 0);
curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
setNextKmer(array[KMER_SIZE]);
//set value.nodeId
@@ -205,7 +201,7 @@
public void setPreKmer(byte preChar){
preForwardKmer.setAsCopy(curForwardKmer);
preForwardKmer.shiftKmerWithPreChar(preChar);
- preReverseKmer.setByReadReverse(preForwardKmer.toString().getBytes(), preForwardKmer.getOffset());
+ preReverseKmer.setByReadReverse(KMER_SIZE, preForwardKmer.toString().getBytes(), preForwardKmer.getBlockOffset());
preKmerDir = preForwardKmer.compareTo(preReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
}
@@ -213,7 +209,7 @@
public void setNextKmer(byte nextChar){
nextForwardKmer.setAsCopy(curForwardKmer);
nextForwardKmer.shiftKmerWithNextChar(nextChar);
- nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
+ nextReverseKmer.setByReadReverse(KMER_SIZE, nextForwardKmer.toString().getBytes(), nextForwardKmer.getBlockOffset());
nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
}
@@ -231,7 +227,7 @@
curReverseKmer.setAsCopy(nextReverseKmer);
}
- public void setMapperOutput(OutputCollector<KmerBytesWritable, NodeWritable> output) throws IOException{
+ public void setMapperOutput(OutputCollector<VKmerBytesWritable, NodeWritable> output) throws IOException{
switch(curKmerDir){
case FORWARD:
output.collect(curForwardKmer, outputNode);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index a0eb7c8..6404f0d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -9,12 +9,12 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
@SuppressWarnings("deprecation")
public class GenomixReducer extends MapReduceBase implements
- Reducer<KmerBytesWritable, NodeWritable, KmerBytesWritable, NodeWritable>{
+ Reducer<VKmerBytesWritable, NodeWritable, VKmerBytesWritable, NodeWritable>{
public static int KMER_SIZE;
private NodeWritable outputNode;
@@ -23,24 +23,23 @@
@Override
public void configure(JobConf job) {
KMER_SIZE = GenomixMapper.KMER_SIZE;
- KmerBytesWritable.setGlobalKmerLength(KMER_SIZE);
outputNode = new NodeWritable();
tmpNode = new NodeWritable();
}
@Override
- public void reduce(KmerBytesWritable key, Iterator<NodeWritable> values,
- OutputCollector<KmerBytesWritable, NodeWritable> output,
+ public void reduce(VKmerBytesWritable key, Iterator<NodeWritable> values,
+ OutputCollector<VKmerBytesWritable, NodeWritable> output,
Reporter reporter) throws IOException {
outputNode.reset();
while (values.hasNext()) {
tmpNode.set(values.next());
outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
- outputNode.getFFList().appendList(tmpNode.getFFList()); //appendList need to check if insert node exists
- outputNode.getFRList().appendList(tmpNode.getFRList());
- outputNode.getRFList().appendList(tmpNode.getRFList());
- outputNode.getRRList().appendList(tmpNode.getRRList());
+ outputNode.getFFList().unionUpdate(tmpNode.getFFList()); //appendList need to check if insert node exists
+ outputNode.getFRList().unionUpdate(tmpNode.getFRList());
+ outputNode.getRFList().unionUpdate(tmpNode.getRFList());
+ outputNode.getRRList().unionUpdate(tmpNode.getRRList());
}
output.collect(key,outputNode);
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index cc7e0ac..498a87d 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,13 +22,13 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/RemoveBridge.txt";
+ private static final String DATA_PATH = "data/webmap/pathmerge_TestSet/5";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
// private static final int COUNT_REDUCER = 2;
private static final int SIZE_KMER = 3;
- private static final int READ_LENGTH = 5;
+ private static final int READ_LENGTH = 7;
private MiniDFSCluster dfsCluster;
private MiniMRCluster mrCluster;
diff --git a/genomix/genomix-pregelix/data/PathMergeTestSet/5/part-00000 b/genomix/genomix-pregelix/data/PathMergeTestSet/5/part-00000
index 1887e36..d369b61 100755
--- a/genomix/genomix-pregelix/data/PathMergeTestSet/5/part-00000
+++ b/genomix/genomix-pregelix/data/PathMergeTestSet/5/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
index 396d5ec..e4f0cde 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
@@ -14,7 +14,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class BinaryDataCleanVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M> {
@@ -38,7 +38,7 @@
public static abstract class BinaryDataCleanVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<KmerBytesWritable, VertexValueWritable> lineRecordReader;
+ private final RecordReader<VKmerBytesWritable, VertexValueWritable> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -48,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryDataCleanVertexReader(RecordReader<KmerBytesWritable, VertexValueWritable> recordReader) {
+ public BinaryDataCleanVertexReader(RecordReader<VKmerBytesWritable, VertexValueWritable> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -74,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<KmerBytesWritable, VertexValueWritable> getRecordReader() {
+ protected RecordReader<VKmerBytesWritable, VertexValueWritable> getRecordReader() {
return lineRecordReader;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
index c07d076..30510a3 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
@@ -11,7 +11,7 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter;
+ private final RecordWriter<VKmerBytesWritable, VertexValueWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<VKmerBytesWritable, VertexValueWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<KmerBytesWritable, VertexValueWritable> getRecordWriter() {
+ public RecordWriter<VKmerBytesWritable, VertexValueWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/InitialGraphCleanVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/InitialGraphCleanVertexInputFormat.java
index d6be23b..4221865 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/InitialGraphCleanVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/InitialGraphCleanVertexInputFormat.java
@@ -14,7 +14,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class InitialGraphCleanVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M> {
@@ -38,7 +38,7 @@
public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<KmerBytesWritable, NodeWritable> lineRecordReader;
+ private final RecordReader<VKmerBytesWritable, NodeWritable> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -48,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryVertexReader(RecordReader<KmerBytesWritable, NodeWritable> recordReader) {
+ public BinaryVertexReader(RecordReader<VKmerBytesWritable, NodeWritable> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -74,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<KmerBytesWritable, NodeWritable> getRecordReader() {
+ protected RecordReader<VKmerBytesWritable, NodeWritable> getRecordReader() {
return lineRecordReader;
}
@@ -98,7 +98,6 @@
@Override
public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
- // TODO Auto-generated method stub
return null;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
index 859ddba..e0c8fa5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
@@ -14,16 +14,16 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat.BinaryDataCleanVertexReader;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class GraphCleanInputFormat extends
- BinaryDataCleanVertexInputFormat<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ BinaryDataCleanVertexInputFormat<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
+ public VertexReader<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryDataCleanLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -31,12 +31,12 @@
@SuppressWarnings("rawtypes")
class BinaryDataCleanLoadGraphReader extends
- BinaryDataCleanVertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ BinaryDataCleanVertexReader<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
private Vertex vertex;
- private KmerBytesWritable vertexId = new KmerBytesWritable();
+ private VKmerBytesWritable vertexId = new VKmerBytesWritable();
private VertexValueWritable vertexValue = new VertexValueWritable();
- public BinaryDataCleanLoadGraphReader(RecordReader<KmerBytesWritable, VertexValueWritable> recordReader) {
+ public BinaryDataCleanLoadGraphReader(RecordReader<VKmerBytesWritable, VertexValueWritable> recordReader) {
super(recordReader);
}
@@ -47,7 +47,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
+ public Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -66,7 +66,6 @@
* set the vertex value
*/
vertexValue.set(getRecordReader().getCurrentValue());
- vertexValue.setKmerlength(getRecordReader().getCurrentValue().getKmerlength());
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanOutputFormat.java
index 32f71be..77960ba 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanOutputFormat.java
@@ -8,18 +8,18 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
public class GraphCleanOutputFormat extends
- BinaryDataCleanVertexOutputFormat<KmerBytesWritable, VertexValueWritable, NullWritable> {
+ BinaryDataCleanVertexOutputFormat<VKmerBytesWritable, VertexValueWritable, NullWritable> {
@Override
- public VertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
+ public VertexWriter<VKmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
- RecordWriter<KmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<VKmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -27,13 +27,13 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+ BinaryVertexWriter<VKmerBytesWritable, VertexValueWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<VKmerBytesWritable, VertexValueWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
index 4dfff11..5662da6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
@@ -17,16 +17,15 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.api.io.binary.InitialGraphCleanVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.InitialGraphCleanVertexInputFormat.BinaryVertexReader;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
public class InitialGraphCleanInputFormat extends
- InitialGraphCleanVertexInputFormat<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ InitialGraphCleanVertexInputFormat<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
+ public VertexReader<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -34,14 +33,14 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ BinaryVertexReader<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
private Vertex vertex;
- private KmerBytesWritable vertexId = new KmerBytesWritable();
+ private VKmerBytesWritable vertexId = new VKmerBytesWritable();
private NodeWritable node = new NodeWritable();
private VertexValueWritable vertexValue = new VertexValueWritable();
- public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, NodeWritable> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<VKmerBytesWritable, NodeWritable> recordReader) {
super(recordReader);
}
@@ -52,7 +51,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
+ public Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -71,14 +70,13 @@
* set the vertex value
*/
node.set(getRecordReader().getCurrentValue());
- vertexValue.setKmerlength(node.getKmerLength());
vertexValue.setNodeIdList(node.getNodeIdList());
vertexValue.setFFList(node.getFFList());
vertexValue.setFRList(node.getFRList());
vertexValue.setRFList(node.getRFList());
vertexValue.setRRList(node.getRRList());
// TODO make this more efficient (don't use toString)
- vertexValue.setKmer(new VKmerBytesWritable(getRecordReader().getCurrentKey().toString()));
+ vertexValue.setActualKmer(new VKmerBytesWritable(getRecordReader().getCurrentKey().toString()));
vertexValue.setState(State.IS_NON);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
index 3f8216c..b4c0aee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
@@ -10,18 +10,18 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
public class P2PathMergeOutputFormat extends
- BinaryDataCleanVertexOutputFormat<KmerBytesWritable, VertexValueWritable, NullWritable> {
+ BinaryDataCleanVertexOutputFormat<VKmerBytesWritable, VertexValueWritable, NullWritable> {
@Override
- public VertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
+ public VertexWriter<VKmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
- RecordWriter<KmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<VKmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -29,13 +29,13 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+ BinaryVertexWriter<VKmerBytesWritable, VertexValueWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<VKmerBytesWritable, VertexValueWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
byte selfFlag = (byte)(vertex.getVertexValue().getState() & State.VERTEX_MASK);
if(selfFlag == State.IS_FINAL)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/GenerateGraphViz.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/GenerateGraphViz.java
new file mode 100644
index 0000000..bb20797
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/GenerateGraphViz.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.genomix.pregelix.graph;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Iterator;
+
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+public class GenerateGraphViz {
+
+ /**
+ * Construct a DOT graph in memory, convert it
+ * to image and store the image in the file system.
+ */
+ public static void convertGraphCleanOutputToGraphViz(String srcDir, String destDir) throws Exception {
+ GraphViz gv = new GraphViz();
+ gv.addln(gv.start_graph());
+
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.getLocal(conf);
+ File srcPath = new File(srcDir);
+
+ String outputNode = "";
+ String outputEdge = "";
+ for (File f : srcPath.listFiles((FilenameFilter) (new WildcardFileFilter("part*")))) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(f.getAbsolutePath()), conf);
+ VKmerBytesWritable key = new VKmerBytesWritable();
+ VertexValueWritable value = new VertexValueWritable();
+
+ while (reader.next(key, value)) {
+ outputNode = "";
+ outputEdge = "";
+ if (key == null) {
+ break;
+ }
+ outputNode += key.toString();
+ /** convert edge to graph **/
+ outputEdge = convertEdgeToGraph(outputNode, value);
+ gv.addln(outputEdge);
+ }
+ reader.close();
+ }
+
+ gv.addln(gv.end_graph());
+ System.out.println(gv.getDotSource());
+
+ String type = "ps";
+ File folder = new File(destDir);
+ folder.mkdirs();
+ File out = new File(destDir + "/result." + type); // Linux
+ gv.writeGraphToFile(gv.getGraph(gv.getDotSource(), type), out);
+ }
+
+ public static String convertEdgeToGraph(String outputNode, VertexValueWritable value){
+ String outputEdge = "";
+ Iterator<VKmerBytesWritable> kmerIterator;
+ kmerIterator = value.getFFList().iterator();
+ while(kmerIterator.hasNext()){
+ VKmerBytesWritable edge = kmerIterator.next();
+ outputEdge += outputNode + " -> " + edge.toString() + "[color = \"black\" label =\"FF\"]\n";
+ }
+ kmerIterator = value.getFRList().iterator();
+ while(kmerIterator.hasNext()){
+ VKmerBytesWritable edge = kmerIterator.next();
+ outputEdge += outputNode + " -> " + edge.toString() + "[color = \"black\" label =\"FR\"]\n";
+ }
+ kmerIterator = value.getRFList().iterator();
+ while(kmerIterator.hasNext()){
+ VKmerBytesWritable edge = kmerIterator.next();
+ outputEdge += outputNode + " -> " + edge.toString() + "[color = \"red\" label =\"RF\"]\n";
+ }
+ kmerIterator = value.getRRList().iterator();
+ while(kmerIterator.hasNext()){
+ VKmerBytesWritable edge = kmerIterator.next();
+ outputEdge += outputNode + " -> " + edge.toString() + "[color = \"red\" label =\"RR\"]\n";
+ }
+ return outputEdge;
+ }
+
+ public static void main(String[] args) throws Exception {
+ GenerateGraphViz.convertGraphCleanOutputToGraphViz("data/actual/bubbleadd/BubbleAddGraph/bin/5", "graphtest");
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
deleted file mode 100644
index 3c24344..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package edu.uci.ics.genomix.pregelix.graph;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-
-public class Graph {
-
- private BufferedReader br;
-
- /**
- * Construct a DOT graph in memory, convert it
- * to image and store the image in the file system.
- *
- * @throws Exception
- */
- private void start(String fileName) throws Exception {
- File filePathTo = new File("graph/" + fileName);
- br = new BufferedReader(new FileReader(filePathTo));
- String line = "";
- String[] split;
-
- String precursor = "";
- String[] adjMap;
- char[] succeeds;
- String succeed = "";
- String output;
-
- GraphViz gv = new GraphViz();
- gv.addln(gv.start_graph());
- while ((line = br.readLine()) != null) {
- split = line.split("\t");
- precursor = split[0];
- adjMap = split[1].split("\\|");
- if (adjMap.length > 1) {
- succeeds = adjMap[1].toCharArray();
- for (int i = 0; i < succeeds.length; i++) {
- succeed = precursor.substring(1) + succeeds[i];
- output = precursor + " -> " + succeed;
- gv.addln(output);
- }
- }
- }
- gv.addln(gv.end_graph());
- System.out.println(gv.getDotSource());
-
- String type = "ps";
- File out = new File("graph/" + fileName + "_out." + type); // Linux
- gv.writeGraphToFile(gv.getGraph(gv.getDotSource(), type), out);
- }
-
- public static void main(String[] args) throws Exception {
- Graph g = new Graph();
- g.start("BridgePath_7");
- g.start("CyclePath_7");
- g.start("SimplePath_7");
- g.start("SinglePath_7");
- g.start("TreePath_7");
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index d5fe843..44fa444 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -6,8 +6,7 @@
import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
private VKmerListWritable forwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index 95fa865..6b32b51 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -41,9 +41,8 @@
this.sourceVertexId.set(msg.getSourceVertexId());
}
if (chainVertexId != null) {
-
checkMessage |= CheckMessage.ACUTUALKMER;
- this.chainVertexId.set(msg.getChainVertexId());
+ this.chainVertexId.setAsCopy(msg.getChainVertexId());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -64,7 +63,8 @@
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.chainVertexId.set(chainVertexId);
+ this.chainVertexId.setAsCopy(chainVertexId);
+
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -102,7 +102,8 @@
public void setChainVertexId(KmerBytesWritable chainVertexId) {
if (chainVertexId != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.chainVertexId.set(chainVertexId);
+ this.chainVertexId.setAsCopy(chainVertexId);
+
}
}
@@ -118,7 +119,7 @@
}
public int getLengthOfChain() {
- return chainVertexId.getKmerLength();
+ return KmerBytesWritable.getKmerLength();
}
public PositionWritable getStartVertexId() {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 92f8464..5cb3169 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -3,12 +3,12 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Comparator;
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -18,23 +18,25 @@
* stores neighber vertexValue when pathVertex sends the message
* file stores the point to the file that stores the chains of connected DNA
*/
- private KmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable sourceVertexId;
private VKmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private PositionListWritable nodeIdList = new PositionListWritable();
+ private float averageCoverage;
private byte flag;
private boolean isFlip;
private int kmerlength = 0;
private boolean updateMsg = false;
- private KmerBytesWritable startVertexId;
+ private VKmerBytesWritable startVertexId;
private byte checkMessage;
public MessageWritable() {
- sourceVertexId = new KmerBytesWritable();
+ sourceVertexId = new VKmerBytesWritable();
kmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable();
- startVertexId = new KmerBytesWritable();
+ startVertexId = new VKmerBytesWritable();
+ averageCoverage = 0;
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -42,11 +44,12 @@
public MessageWritable(int kmerSize) {
kmerlength = kmerSize;
+ sourceVertexId = new VKmerBytesWritable(kmerSize);
+ kmer = new VKmerBytesWritable(0);
- sourceVertexId = new KmerBytesWritable(kmerSize);
- kmer = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable(kmerSize);
- startVertexId = new KmerBytesWritable(kmerSize);
+ startVertexId = new VKmerBytesWritable(kmerSize);
+ averageCoverage = 0;
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -61,7 +64,8 @@
}
if (kmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.set(msg.getActualKmer());
+ this.kmer.setAsCopy(msg.getActualKmer());
+
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -69,14 +73,14 @@
}
if (startVertexId != null) {
checkMessage |= CheckMessage.START;
- this.startVertexId.set(msg.getStartVertexId());
+ this.startVertexId.setAsCopy(msg.getStartVertexId());
}
checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
updateMsg = msg.isUpdateMsg();
}
- public void set(int kmerlength, KmerBytesWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+ public void set(int kmerlength, VKmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
this.kmerlength = kmerlength;
checkMessage = 0;
if (sourceVertexId != null) {
@@ -85,7 +89,8 @@
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.set(chainVertexId);
+ this.kmer.setAsCopy(chainVertexId);
+
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -104,15 +109,16 @@
// kmer.reset();
neighberNode.reset(kmerSize);
startVertexId.reset(kmerSize);
+ averageCoverage = 0;
flag = Message.NON;
isFlip = false;
}
- public KmerBytesWritable getSourceVertexId() {
+ public VKmerBytesWritable getSourceVertexId() {
return sourceVertexId;
}
- public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
+ public void setSourceVertexId(VKmerBytesWritable sourceVertexId) {
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
this.sourceVertexId.setAsCopy(sourceVertexId);
@@ -126,7 +132,8 @@
public void setActualKmer(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.set(actualKmer);
+ this.kmer.setAsCopy(actualKmer);
+
}
}
@@ -134,10 +141,11 @@
return kmer;
}
- public void setCreatedVertexId(KmerBytesWritable actualKmer) {
+ public void setCreatedVertexId(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.set(actualKmer);
+ this.kmer.setAsCopy(actualKmer);
+
}
}
@@ -152,16 +160,24 @@
}
}
- public KmerBytesWritable getStartVertexId() {
+ public VKmerBytesWritable getStartVertexId() {
return startVertexId;
}
- public void setStartVertexId(KmerBytesWritable startVertexId) {
+ public void setStartVertexId(VKmerBytesWritable startVertexId) {
if(startVertexId != null){
checkMessage |= CheckMessage.START;
- this.startVertexId.set(startVertexId);
+ this.startVertexId.setAsCopy(startVertexId);
}
}
+
+ public float getAverageCoverage() {
+ return averageCoverage;
+ }
+
+ public void setAverageCoverage(float averageCoverage) {
+ this.averageCoverage = averageCoverage;
+ }
public int getLengthOfChain() {
return kmer.getKmerLetterLength();
@@ -217,6 +233,7 @@
nodeIdList.write(out);
if ((checkMessage & CheckMessage.START) != 0)
startVertexId.write(out);
+ out.writeFloat(averageCoverage);
out.writeBoolean(isFlip);
out.writeByte(flag);
out.writeBoolean(updateMsg);
@@ -237,6 +254,7 @@
nodeIdList.readFields(in);
if ((checkMessage & CheckMessage.START) != 0)
startVertexId.readFields(in);
+ averageCoverage = in.readFloat();
isFlip = in.readBoolean();
flag = in.readByte();
updateMsg = in.readBoolean();
@@ -265,4 +283,11 @@
public int compareTo(MessageWritable tp) {
return sourceVertexId.compareTo(tp.sourceVertexId);
}
+
+ public static final class SortByCoverage implements Comparator<MessageWritable> {
+ @Override
+ public int compare(MessageWritable left, MessageWritable right) {
+ return Float.compare(left.averageCoverage, right.averageCoverage);
+ }
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 27fa846..70ef13f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -6,7 +6,6 @@
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
@@ -47,9 +46,9 @@
private PositionListWritable nodeIdList;
private AdjacencyListWritable incomingList;
private AdjacencyListWritable outgoingList;
+ private VKmerBytesWritable actualKmer;
+ private float averageCoverage;
private byte state;
- private VKmerBytesWritable kmer;
- private int kmerlength = 0;
private boolean isFakeVertex = false;
public VertexValueWritable() {
@@ -57,38 +56,37 @@
}
public VertexValueWritable(int kmerSize){
- kmerlength = kmerSize;
nodeIdList = new PositionListWritable();
incomingList = new AdjacencyListWritable();
outgoingList = new AdjacencyListWritable();
+ actualKmer = new VKmerBytesWritable();
state = State.IS_NON;
- kmer = new VKmerBytesWritable();
+ averageCoverage = 0;
}
public VertexValueWritable(PositionListWritable nodeIdList, VKmerListWritable forwardForwardList, VKmerListWritable forwardReverseList,
- VKmerListWritable reverseForwardList, VKmerListWritable reverseReverseList,
- byte state, VKmerBytesWritable kmer) {
+ VKmerListWritable reverseForwardList, VKmerListWritable reverseReverseList, VKmerBytesWritable actualKmer,
+ float averageCoverage, byte state) {
set(nodeIdList, forwardForwardList, forwardReverseList,
- reverseForwardList, reverseReverseList,
- state, kmer);
+ reverseForwardList, reverseReverseList, actualKmer,
+ averageCoverage, state);
}
public void set(PositionListWritable nodeIdList, VKmerListWritable forwardForwardList, VKmerListWritable forwardReverseList,
- VKmerListWritable reverseForwardList, VKmerListWritable reverseReverseList,
- byte state, VKmerBytesWritable kmer) {
- this.kmerlength = kmer.getKmerLetterLength();
+ VKmerListWritable reverseForwardList, VKmerListWritable reverseReverseList, VKmerBytesWritable actualKmer,
+ float averageCoverage, byte state) {
this.incomingList.setForwardList(reverseForwardList);
this.incomingList.setReverseList(reverseReverseList);
this.outgoingList.setForwardList(forwardForwardList);
this.outgoingList.setReverseList(forwardReverseList);
+ this.actualKmer.setAsCopy(actualKmer);
+ this.averageCoverage = averageCoverage;
this.state = state;
- this.kmer.setAsCopy(kmer);
}
public void set(VertexValueWritable value) {
- this.kmerlength = value.kmerlength;
- set(value.getNodeIdList(), value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
- value.getKmer());
+ set(value.getNodeIdList(), value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),
+ value.getActualKmer(), value.getAverageCoverage(), value.getState());
}
@@ -152,7 +150,22 @@
return state;
}
-
+ public VKmerBytesWritable getActualKmer() {
+ return actualKmer;
+ }
+
+ public void setActualKmer(VKmerBytesWritable kmer) {
+ this.actualKmer.setAsCopy(kmer);
+ }
+
+ public float getAverageCoverage() {
+ return averageCoverage;
+ }
+
+ public void setAverageCoverage(float averageCoverage) {
+ this.averageCoverage = averageCoverage;
+ }
+
public boolean isFakeVertex() {
return isFakeVertex;
}
@@ -166,55 +179,47 @@
}
public int getLengthOfKmer() {
- return kmer.getKmerLetterLength();
+ return actualKmer.getKmerLetterLength();
}
-
- public VKmerBytesWritable getKmer() {
- return kmer;
+
+ public void reset() {
+ this.reset(0);
}
-
- public void setKmer(VKmerBytesWritable kmer) {
- this.kmer.setAsCopy(kmer);
- }
-
- public int getKmerlength() {
- return kmerlength;
- }
-
- public void setKmerlength(int kmerlength) {
- this.kmerlength = kmerlength;
- }
-
+
public void reset(int kmerSize) {
- this.kmerlength = kmerSize;
this.nodeIdList.reset();
this.incomingList.getForwardList().reset();
this.incomingList.getReverseList().reset();
this.outgoingList.getForwardList().reset();
this.outgoingList.getReverseList().reset();
-// this.kmer.reset(0);
+ this.actualKmer.reset(0);
+ averageCoverage = 0;
}
@Override
public void readFields(DataInput in) throws IOException {
- this.kmerlength = in.readInt();
- this.reset(kmerlength);
+ reset();
this.nodeIdList.readFields(in);
- this.incomingList.readFields(in);
- this.outgoingList.readFields(in);
+ this.outgoingList.getForwardList().readFields(in);
+ this.outgoingList.getReverseList().readFields(in);
+ this.incomingList.getForwardList().readFields(in);
+ this.incomingList.getReverseList().readFields(in);
+ this.actualKmer.readFields(in);
+ averageCoverage = in.readFloat();
this.state = in.readByte();
- this.kmer.readFields(in);
this.isFakeVertex = in.readBoolean();
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(this.kmerlength);
this.nodeIdList.write(out);
- this.incomingList.write(out);
- this.outgoingList.write(out);
+ this.outgoingList.getForwardList().write(out);
+ this.outgoingList.getReverseList().write(out);
+ this.incomingList.getForwardList().write(out);
+ this.incomingList.getReverseList().write(out);
+ this.actualKmer.write(out);
+ out.writeFloat(averageCoverage);
out.writeByte(this.state);
- this.kmer.write(out);
out.writeBoolean(this.isFakeVertex);
}
@@ -232,7 +237,7 @@
sbuilder.append(outgoingList.getReverseList().toString()).append('\t');
sbuilder.append(incomingList.getForwardList().toString()).append('\t');
sbuilder.append(incomingList.getReverseList().toString()).append('\t');
- sbuilder.append(kmer.toString()).append('}');
+ sbuilder.append(actualKmer.toString()).append('}');
return sbuilder.toString();
}
@@ -251,7 +256,7 @@
/*
* Delete the corresponding edge
*/
- public void processDelete(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete){
+ public void processDelete(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete);
@@ -271,8 +276,8 @@
/*
* Process any changes to value. This is for edge updates
*/
- public void processUpdates(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
- byte neighborToMergeDir, KmerBytesWritable nodeToAdd){
+ public void processUpdates(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
+ byte neighborToMergeDir, VKmerBytesWritable nodeToAdd){
// TODO
// this.getListFromDir(neighborToDeleteDir).remove(nodeToDelete);
// this.getListFromDir(neighborToMergeDir).append(nodeToDelete);
@@ -310,25 +315,26 @@
/*
* Process any changes to value. This is for merging
*/
- public void processMerges(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
- byte neighborToMergeDir, KmerBytesWritable nodeToAdd,
+ public void processMerges(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
+ byte neighborToMergeDir, VKmerBytesWritable nodeToAdd,
+
int kmerSize, VKmerBytesWritable kmer){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete); //set(null);
- this.getKmer().mergeWithFFKmer(kmerSize, kmer);
+ this.getActualKmer().mergeWithFFKmer(kmerSize, kmer);
break;
case MessageFlag.DIR_FR:
this.getFRList().remove(nodeToDelete);
- this.getKmer().mergeWithFRKmer(kmerSize, kmer);
+ this.getActualKmer().mergeWithFRKmer(kmerSize, kmer);
break;
case MessageFlag.DIR_RF:
this.getRFList().remove(nodeToDelete);
- this.getKmer().mergeWithRFKmer(kmerSize, kmer);
+ this.getActualKmer().mergeWithRFKmer(kmerSize, kmer);
break;
case MessageFlag.DIR_RR:
this.getRRList().remove(nodeToDelete);
- this.getKmer().mergeWithRRKmer(kmerSize, kmer);
+ this.getActualKmer().mergeWithRRKmer(kmerSize, kmer);
break;
}
// TODO: remove switch below and replace with general direction merge
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index dd78cde..90eefa1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -70,7 +70,7 @@
builder.append("Step: " + step + "\r\n");
builder.append("Source Code: " + source + "\r\n");
if (operation == 0) {
- if (destVertexId.getKmerLength() != -1) {
+ if (KmerBytesWritable.getKmerLength() != -1) {
String dest = destVertexId.toString();
builder.append("Send message to " + "\r\n");
builder.append("Destination Code: " + dest + "\r\n");
@@ -88,7 +88,7 @@
if (operation == 2) {
chain = mergeChain.toString();
builder.append("Merge Chain: " + chain + "\r\n");
- builder.append("Merge Chain Length: " + mergeChain.getKmerLength() + "\r\n");
+ builder.append("Merge Chain Length: " + KmerBytesWritable.getKmerLength() + "\r\n");
}
if (operation == 3)
builder.append("Vote to halt!");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 3c7ae8a..13a6223 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -4,7 +4,9 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
+
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -13,7 +15,6 @@
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -47,8 +48,8 @@
* Naive Algorithm for path merge graph
*/
public class BridgeAddVertex extends
- Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize"; // TODO consolidate config options
public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
private int length = -1;
@@ -57,10 +58,12 @@
* initiate kmerSize, maxIteration
*/
public void initVertex() {
- if (kmerSize == -1)
+ if (kmerSize == -1) {
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
+ }
if (length == -1)
- length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5); // TODO fail on parse
}
@SuppressWarnings("unchecked")
@@ -69,8 +72,8 @@
initVertex();
if(getSuperstep() == 1){
if(getVertexId().toString().equals("ATA")){
- KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
- vertexId.setByRead("GTA".getBytes(), 0);
+ VKmerBytesWritable vertexId = new VKmerBytesWritable(kmerSize);
+ vertexId.setByRead(kmerSize, "GTA".getBytes(), 0);
getVertexValue().getFRList().append(vertexId);
//add bridge vertex
@@ -86,22 +89,23 @@
/**
* set the vertex value
*/
- KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ VKmerListWritable kmerFRList = new VKmerListWritable();
+
kmerFRList.append(getVertexId());
vertexValue.setFRList(kmerFRList);
- KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
- otherVertexId.setByRead("ACG".getBytes(), 0);
- KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ VKmerBytesWritable otherVertexId = new VKmerBytesWritable(kmerSize);
+ otherVertexId.setByRead(kmerSize, "ACG".getBytes(), 0);
+ VKmerListWritable kmerRFList = new VKmerListWritable();
kmerRFList.append(otherVertexId);
vertexValue.setRFList(kmerRFList);
- vertexValue.setKmer(vertexId);
+ vertexValue.setActualKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
}
else if(getVertexId().toString().equals("ACG")){
- KmerBytesWritable brdgeVertexId = new KmerBytesWritable(kmerSize);
- brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ VKmerBytesWritable brdgeVertexId = new VKmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead(kmerSize, "GTA".getBytes(), 0);
getVertexValue().getRFList().append(brdgeVertexId);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index caf7a36..039f5b4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -12,7 +12,7 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -67,7 +67,7 @@
else
outgoingMsg.reset(kmerSize);
if(destVertexId == null)
- destVertexId = new KmerBytesWritable(kmerSize);
+ destVertexId = new VKmerBytesWritable(kmerSize);
receivedMsgList.clear();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index f5aa3a1..aca9d95 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -3,8 +3,9 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -46,7 +47,7 @@
* Remove tip or single node when l > constant
*/
public class BubbleAddVertex extends
- Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BubbleAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -64,8 +65,8 @@
initVertex();
if(getSuperstep() == 1){
if(getVertexId().toString().equals("ATA")){
- KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
- vertexId.setByRead("GTA".getBytes(), 0);
+ VKmerBytesWritable vertexId = new VKmerBytesWritable();
+ vertexId.setByRead(kmerSize, "GTA".getBytes(), 0);
getVertexValue().getFRList().append(vertexId);
//add bridge vertex
@@ -81,22 +82,24 @@
/**
* set the vertex value
*/
- KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ VKmerListWritable kmerFRList = new VKmerListWritable();
+
kmerFRList.append(getVertexId());
vertexValue.setFRList(kmerFRList);
- KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
- otherVertexId.setByRead("AGA".getBytes(), 0);
- KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ VKmerBytesWritable otherVertexId = new VKmerBytesWritable();
+ otherVertexId.setByRead(kmerSize, "AGA".getBytes(), 0);
+ VKmerListWritable kmerRFList = new VKmerListWritable();
kmerRFList.append(otherVertexId);
vertexValue.setRFList(kmerRFList);
- vertexValue.setKmer(vertexId);
+ vertexValue.setActualKmer(vertexId);
+
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
}
else if(getVertexId().toString().equals("AGA")){
- KmerBytesWritable brdgeVertexId = new KmerBytesWritable(kmerSize);
- brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ VKmerBytesWritable brdgeVertexId = new VKmerBytesWritable();
+ brdgeVertexId.setByRead(kmerSize, "GTA".getBytes(), 0);
getVertexValue().getRFList().append(brdgeVertexId);
}
}
@@ -112,7 +115,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index d630b5f..9b8a03d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -1,73 +1,30 @@
package edu.uci.ics.genomix.pregelix.operator.bubblemerge;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
-import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
/**
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
- public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
- public static final String ITERATIONS = "BubbleMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
+ BasicGraphCleanVertex {
- private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
- private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
- private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
-
- private Iterator<PositionWritable> iterator;
- private PositionWritable pos = new PositionWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
- private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsgMap = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
- private ArrayList<MergeBubbleMessageWritable> receivedMsgList = new ArrayList<MergeBubbleMessageWritable>();
+ private Map<VKmerBytesWritable, ArrayList<MessageWritable>> receivedMsgMap = new HashMap<VKmerBytesWritable, ArrayList<MessageWritable>>();
+ private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
/**
* initiate kmerSize, maxIteration
@@ -79,193 +36,49 @@
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
outgoingMsg.reset();
}
- /**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
- public PositionWritable getPrevDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getRFList().iterator();
- else // #FRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * check if prev/next destination exists
- */
- public boolean hasNextDest(VertexValueWritable value){
- return value.getFFList().getCountOfPosition() > 0 || value.getFRList().getCountOfPosition() > 0;
- }
-
- public boolean hasPrevDest(VertexValueWritable value){
- return value.getRFList().getCountOfPosition() > 0 || value.getRRList().getCountOfPosition() > 0;
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllPrevNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * broadcast kill self to all neighbers Pre-condition: vertex is a path vertex
- */
- public void broadcaseKillself(){
- outgoingMsg.setSourceVertexId(getVertexId());
-
- if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
- else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
-
-
- if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
- }
- else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
- }
-
- deleteVertex(getVertexId());
- }
-
- /**
- * do some remove operations on adjMap after receiving the info about dead Vertex
- */
- public void responseToDeadVertex(Iterator<MergeBubbleMessageWritable> msgIterator){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
- //remove incomingMsg.getSourceId from RR positionList
- iterator = getVertexValue().getRRList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
+ public void sendBubbleAndMajorVertexMsgToMinorVertex(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ if(hasNextDest(getVertexValue())){
+ outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
- //remove incomingMsg.getSourceId from RF positionList
- iterator = getVertexValue().getFRList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
+ break;
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ if(hasPrevDest(getVertexValue())){
+ outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
+ destVertexId.setAsCopy(getPrevDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
- //remove incomingMsg.getSourceId from FR positionList
- iterator = getVertexValue().getRFList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
- //remove incomingMsg.getSourceId from FF positionList
- iterator = getVertexValue().getFFList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- }
+ break;
}
}
@SuppressWarnings("unchecked")
@Override
- public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
if(VertexUtil.isHeadVertexWithIndegree(getVertexValue())
|| VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsgToAllNextNodes(getVertexValue());
+ sendSettledMsgToAllNextNodes(getVertexValue());
}
-// if(VertexUtil.isRearVertexWithOutdegree(getVertexValue())
-// || VertexUtil.isRearWithoutOutdegree(getVertexValue())){
-// outgoingMsg.setSourceVertexId(getVertexId());
-// sendMsgToAllPrevNodes(getVertexValue());
-// }
} else if (getSuperstep() == 2){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(VertexUtil.isPathVertex(getVertexValue())){
- switch(incomingMsg.getMessage()){
- case AdjMessage.FROMFF:
- case AdjMessage.FROMRF:
- if(hasNextDest(getVertexValue())){
- outgoingMsg.setMessage(AdjMessage.NON);
- outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- }
- break;
- case AdjMessage.FROMFR:
- case AdjMessage.FROMRR:
- if(hasPrevDest(getVertexValue())){
- outgoingMsg.setMessage(AdjMessage.NON);
- outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
- destVertexId.set(getPrevDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- }
- break;
- }
+ /** send bubble and major vertex msg to minor vertex **/
+ sendBubbleAndMajorVertexMsgToMinorVertex();
}
}
} else if (getSuperstep() == 3){
@@ -274,86 +87,43 @@
if(!receivedMsgMap.containsKey(incomingMsg.getStartVertexId())){
receivedMsgList.clear();
receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
}
else{
receivedMsgList.clear();
receivedMsgList.addAll(receivedMsgMap.get(incomingMsg.getStartVertexId()));
receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
}
}
- for(PositionWritable prevId : receivedMsgMap.keySet()){
- receivedMsgList = receivedMsgMap.get(prevId);
- if(receivedMsgList.size() > 1){
- //find the node with largest length of Kmer
- boolean flag = true; //the same length
- int maxLength = receivedMsgList.get(0).getLengthOfChain();
- PositionWritable max = receivedMsgList.get(0).getSourceVertexId();
- PositionWritable secondMax = receivedMsgList.get(0).getSourceVertexId();
- for(int i = 1; i < receivedMsgList.size(); i++){
- if(receivedMsgList.get(i).getLengthOfChain() != maxLength)
- flag = false;
- if(receivedMsgList.get(i).getLengthOfChain() >= maxLength){
- maxLength = receivedMsgList.get(i).getLengthOfChain();
- secondMax.set(max);
- max = receivedMsgList.get(i).getSourceVertexId();
- }
- }
- //send unchange or merge Message to node with largest length
- if(flag == true){
- //1. send unchange Message to node with largest length
- // we can send no message to complete this step
- //2. send delete Message to node which doesn't have largest length
- for(int i = 0; i < receivedMsgList.size(); i++){
- //if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) != 0)
- if(receivedMsgList.get(i).getSourceVertexId().compareTo(secondMax) == 0){
- outgoingMsg.setMessage(AdjMessage.KILL);
- outgoingMsg.setStartVertexId(prevId);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(secondMax, outgoingMsg);
- } else if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) == 0){
- outgoingMsg.setMessage(AdjMessage.UNCHANGE);
- sendMsg(max, outgoingMsg);
- }
- }
- } else{
- //send merge Message to node with largest length
- for(int i = 0; i < receivedMsgList.size(); i++){
- //if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) != 0)
- if(receivedMsgList.get(i).getSourceVertexId().compareTo(secondMax) == 0){
- outgoingMsg.setMessage(AdjMessage.KILL);
- outgoingMsg.setStartVertexId(prevId);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(receivedMsgList.get(i).getSourceVertexId(), outgoingMsg);
- } else if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) == 0){
- outgoingMsg.setMessage(AdjMessage.MERGE);
- /* add other node in message */
- for(int j = 0; j < receivedMsgList.size(); i++){
- if(receivedMsgList.get(j).getSourceVertexId().compareTo(secondMax) == 0){
- outgoingMsg.setChainVertexId(receivedMsgList.get(j).getChainVertexId());
- break;
- }
- }
- sendMsg(receivedMsgList.get(i).getSourceVertexId(), outgoingMsg);
- }
- }
- }
+ for(VKmerBytesWritable prevId : receivedMsgMap.keySet()){
+ if(receivedMsgList.size() > 1){ // filter bubble
+ /** for each startVertex, sort the node by decreasing order of coverage **/
+ receivedMsgList = receivedMsgMap.get(prevId);
+ Collections.sort(receivedMsgList, new MessageWritable.SortByCoverage());
+ System.out.println("");
+
+
+ /** process similarSet, keep the unchanged set and deleted set & add coverage to unchange node **/
+
+ /** send message to the unchanged set for updating coverage & send kill message to the deleted set **/
+
}
}
} else if (getSuperstep() == 4){
if(msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.KILL){
+ if(incomingMsg.getFlag() == MessageFlag.KILL){
broadcaseKillself();
- } else if (incomingMsg.getMessage() == AdjMessage.MERGE){
- //merge with small node
- getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
- incomingMsg.getChainVertexId()));
- }
+ }
}
} else if(getSuperstep() == 5){
- responseToDeadVertex(msgIterator);
+ if(msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getFlag() == MessageFlag.KILL){
+ responseToDeadVertex();
+ }
+ }
}
voteToHalt();
}
@@ -367,7 +137,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 4ae7231..080cf35 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -12,7 +12,6 @@
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.GeneCode;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
/**
@@ -27,10 +26,9 @@
protected MessageWritable incomingMsg = null;
protected MessageWritable outgoingMsg = null;
-
- protected KmerBytesWritable destVertexId = null;
- protected Iterator<KmerBytesWritable> kmerIterator;
- protected KmerBytesWritable tmpKmer = new KmerBytesWritable(kmerSize);
+ protected VKmerBytesWritable destVertexId = null;
+ protected Iterator<VKmerBytesWritable> kmerIterator;
+ protected VKmerBytesWritable tmpKmer = new VKmerBytesWritable(kmerSize);
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -68,10 +66,10 @@
* get destination vertex
*/
public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
- if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+ if (value.getFFList().getCountOfPosition() > 0){ //#FFList() > 0
kmerIterator = value.getFFList().iterator();
return kmerIterator.next();
- } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
+ } else if (value.getFRList().getCountOfPosition() > 0){ //#FRList() > 0
kmerIterator = value.getFRList().iterator();
return kmerIterator.next();
} else {
@@ -79,11 +77,11 @@
}
}
- public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
- if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+ public VKmerBytesWritable getPrevDestVertexId(VertexValueWritable value) {
+ if (value.getRFList().getCountOfPosition() > 0){ //#RFList() > 0
kmerIterator = value.getRFList().iterator();
return kmerIterator.next();
- } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
+ } else if (value.getRRList().getCountOfPosition() > 0){ //#RRList() > 0
kmerIterator = value.getRRList().iterator();
return kmerIterator.next();
} else {
@@ -111,7 +109,7 @@
}
- public VKmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getPrevDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
kmerIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
@@ -133,12 +131,12 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
kmerIterator = value.getRFList().iterator(); // RFList
while(kmerIterator.hasNext()){
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
kmerIterator = value.getRRList().iterator(); // RRList
while(kmerIterator.hasNext()){
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -149,12 +147,12 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
kmerIterator = value.getFFList().iterator(); // FFList
while(kmerIterator.hasNext()){
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
kmerIterator = value.getFRList().iterator(); // FRList
while(kmerIterator.hasNext()){
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -177,7 +175,7 @@
else if(getVertexValue().getFRList().getCountOfPosition() > 0)
outgoingMsg.setFlag(MessageFlag.DIR_FR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -191,7 +189,7 @@
else if(getVertexValue().getRRList().getCountOfPosition() > 0)
outgoingMsg.setFlag(MessageFlag.DIR_RR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getPreDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getPrevDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -203,14 +201,14 @@
while(kmerIterator.hasNext()){
outgoingMsg.setFlag(MessageFlag.DIR_RF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
kmerIterator = value.getRRList().iterator(); // RRList
while(kmerIterator.hasNext()){
outgoingMsg.setFlag(MessageFlag.DIR_RR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -223,14 +221,14 @@
while(kmerIterator.hasNext()){
outgoingMsg.setFlag(MessageFlag.DIR_FF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
kmerIterator = value.getFRList().iterator(); // FRList
while(kmerIterator.hasNext()){
outgoingMsg.setFlag(MessageFlag.DIR_FR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(kmerIterator.next());
+ destVertexId.setAsCopy(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -352,8 +350,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- if(getPreDestVertexId(getVertexValue()) != null)
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ if(getPrevDestVertexId(getVertexValue()) != null)
+ sendMsg(getPrevDestVertexId(getVertexValue()), outgoingMsg);
break;
}
}
@@ -408,7 +406,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
@@ -421,7 +419,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
@@ -446,7 +444,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
@@ -459,7 +457,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
@@ -482,7 +480,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
@@ -495,8 +493,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
+ sendMsg(getPrevDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
}
@@ -651,36 +649,32 @@
int index;
switch(neighborToMeDir){
case MessageFlag.DIR_FF:
- selfString = getVertexValue().getKmer().toString();
+ selfString = getVertexValue().getActualKmer().toString();
match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
msgString = msg.getActualKmer().toString();
index = msgString.indexOf(match);
-// tmpKmer.reset(msgString.length() - index);
- tmpKmer.setByRead(msgString.substring(index).getBytes(), 0);
+ tmpKmer.setByRead(msgString.length() - index, msgString.substring(index).getBytes(), 0);
break;
case MessageFlag.DIR_FR:
- selfString = getVertexValue().getKmer().toString();
+ selfString = getVertexValue().getActualKmer().toString();
match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
msgString = GeneCode.reverseComplement(msg.getActualKmer().toString());
index = msgString.indexOf(match);
-// tmpKmer.reset(msgString.length() - index);
- tmpKmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
+ tmpKmer.setByReadReverse(msgString.length() - index, msgString.substring(index).getBytes(), 0);
break;
case MessageFlag.DIR_RF:
- selfString = getVertexValue().getKmer().toString();
+ selfString = getVertexValue().getActualKmer().toString();
match = selfString.substring(0,kmerSize - 1);
msgString = GeneCode.reverseComplement(msg.getActualKmer().toString());
index = msgString.lastIndexOf(match) + kmerSize - 2;
-// tmpKmer.reset(index + 1);
- tmpKmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ tmpKmer.setByReadReverse(index + 1, msgString.substring(0, index + 1).getBytes(), 0);
break;
case MessageFlag.DIR_RR:
- selfString = getVertexValue().getKmer().toString();
+ selfString = getVertexValue().getActualKmer().toString();
match = selfString.substring(0,kmerSize - 1);
msgString = msg.getActualKmer().toString();
index = msgString.lastIndexOf(match) + kmerSize - 2;
-// tmpKmer.reset(index + 1); // TODO: fix ALL of these resets (only if you need to)
- tmpKmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ tmpKmer.setByRead(index + 1, msgString.substring(0, index + 1).getBytes(), 0);
break;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index 51100e5..10137da 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -12,7 +12,6 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -50,13 +49,12 @@
else
kmerList.reset();
if(fakeVertex == null){
-// fakeVertex = new KmerBytesWritable(kmerSize + 1); // TODO check if merge is correct
fakeVertex = new VKmerBytesWritable();
String random = generaterRandomString(kmerSize + 1);
- fakeVertex.setByRead(random.getBytes(), 0);
+ fakeVertex.setByRead(kmerSize + 1, random.getBytes(), 0);
}
if(destVertexId == null)
- destVertexId = new KmerBytesWritable(kmerSize);
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
/**
@@ -98,7 +96,7 @@
public void sendMsgToFakeVertex(){
if(!getVertexValue().isFakeVertex()){
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setActualKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
sendMsg(fakeVertex, outgoingMsg);
voteToHalt();
}
@@ -109,9 +107,8 @@
incomingMsg = msgIterator.next();
String kmerString = incomingMsg.getActualKmer().toString();
tmpKmer.reset(kmerString.length());
-// reverseKmer.reset(kmerString.length());//kmerbyteswritable
- tmpKmer.setByRead(kmerString.getBytes(), 0);
- reverseKmer.setByReadReverse(kmerString.getBytes(), 0);
+ tmpKmer.setByRead(kmerString.length(), kmerString.getBytes(), 0);
+ reverseKmer.setByReadReverse(kmerString.length(), kmerString.getBytes(), 0);
if(reverseKmer.compareTo(tmpKmer) < 0)
tmpKmer.setAsCopy(reverseKmer);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 3447f25..f5b0157 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -3,12 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
@@ -50,7 +49,7 @@
* Naive Algorithm for path merge graph
*/
public class P1ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P1ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P1ForPathMergeVertex.iteration";
public static int kmerSize = -1;
@@ -62,8 +61,8 @@
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
+ private Iterator<VKmerBytesWritable> posIterator;
/**
* initiate kmerSize, maxIteration
@@ -79,7 +78,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -87,7 +86,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -101,12 +100,12 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -117,12 +116,12 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -186,7 +185,7 @@
//merge chain
lastKmer.setAsCopy(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
incomingMsg.getActualKmer()));
- getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer));
+ getVertexValue().setActualKmer(kmerFactory.mergeTwoKmer(getVertexValue().getActualKmer(), lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -196,7 +195,7 @@
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
@@ -204,7 +203,7 @@
if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
mergeChainVertex();
@@ -222,7 +221,7 @@
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
if (getVertexValue().getState() == State.IS_HEAD)//is_tail
outgoingMsg.setFlag(Message.STOP);
destVertexId.setAsCopy(incomingMsg.getSourceVertexId());
@@ -259,7 +258,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index d3f21f3..6b7ad1f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -12,7 +12,6 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
@@ -47,7 +46,7 @@
MapReduceVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
- KmerBytesWritable tmpKmer = new KmerBytesWritable();
+ VKmerBytesWritable tmpKmer = new VKmerBytesWritable();
private boolean isFakeVertex = false;
/**
@@ -77,11 +76,11 @@
// fakeVertex = new KmerBytesWritable(kmerSize + 1);
fakeVertex = new VKmerBytesWritable();
String random = generaterRandomString(kmerSize + 1);
- fakeVertex.setByRead(random.getBytes(), 0);
+ fakeVertex.setByRead(kmerSize + 1, random.getBytes(), 0);
}
isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
if(destVertexId == null)
- destVertexId = new KmerBytesWritable(kmerSize);
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
/**
@@ -98,7 +97,7 @@
}
//send wantToMerge to prev
- tmpKmer = getPreDestVertexIdAndSetFlag(getVertexValue());
+ tmpKmer = getPrevDestVertexIdAndSetFlag(getVertexValue());
if(tmpKmer != null){
destVertexId.setAsCopy(tmpKmer);
outgoingMsg.setFlag(outFlag);
@@ -300,7 +299,7 @@
*/
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index cf35c7a..9a3aed5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -3,12 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
@@ -50,7 +49,7 @@
* Naive Algorithm for path merge graph
*/
public class P3ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
@@ -66,8 +65,8 @@
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
+ private Iterator<VKmerBytesWritable> posIterator;
/**
* initiate kmerSize, maxIteration
*/
@@ -86,7 +85,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -94,7 +93,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -108,12 +107,12 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -124,12 +123,12 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ destVertexId.setAsCopy(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -206,8 +205,7 @@
public void markPseudoHead() {
getVertexValue().setState(State2.PSEUDOHEAD);
outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
- destVertexId
- .set(getPreDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -232,8 +230,8 @@
public void mergeChainVertex(){
lastKmer.setAsCopy(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
incomingMsg.getActualKmer()));
- getVertexValue().setKmer(
- kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
+ getVertexValue().setActualKmer(
+ kmerFactory.mergeTwoKmer(getVertexValue().getActualKmer(),
lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -244,7 +242,7 @@
public void sendMsgToPathVertexMergePhase(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3 + 2 * maxRound + 2) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
@@ -252,8 +250,7 @@
if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId
- .set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
mergeChainVertex();
@@ -271,7 +268,7 @@
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
if (getVertexValue().getState() == State2.END_VERTEX)
outgoingMsg.setFlag(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -284,7 +281,7 @@
if (getSuperstep() == 4) {
if(getVertexValue().getState() != State2.START_HALT){
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
}
@@ -297,7 +294,7 @@
if (incomingMsg.getFlag() != Message.STOP
&& incomingMsg.getFlag() != Message.FROMPSEUDOREAR) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
+ destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
} else {
@@ -325,7 +322,7 @@
else {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
- outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
if (getVertexValue().getState() == State2.PSEUDOREAR)
outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State2.END_VERTEX)
@@ -410,7 +407,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 02151fa..28d0563 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -5,7 +5,6 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
@@ -14,7 +13,7 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -56,9 +55,9 @@
private float probBeingRandomHead = -1;
private Random randGenerator;
- private KmerBytesWritable curKmer = new KmerBytesWritable();
- private KmerBytesWritable nextKmer = new KmerBytesWritable();
- private KmerBytesWritable prevKmer = new KmerBytesWritable();
+ private VKmerBytesWritable curKmer = new VKmerBytesWritable();
+ private VKmerBytesWritable nextKmer = new VKmerBytesWritable();
+ private VKmerBytesWritable prevKmer = new VKmerBytesWritable();
private boolean hasNext;
private boolean hasPrev;
private boolean curHead;
@@ -81,7 +80,7 @@
else
outgoingMsg.reset(kmerSize);
if(destVertexId == null)
- destVertexId = new KmerBytesWritable(kmerSize);
+ destVertexId = new VKmerBytesWritable(kmerSize);
randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
@@ -97,7 +96,7 @@
headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
}
- protected boolean isNodeRandomHead(KmerBytesWritable nodeKmer) {
+ protected boolean isNodeRandomHead(VKmerBytesWritable nodeKmer) {
// "deterministically random", based on node id
//randGenerator.setSeed(randSeed);
//randSeed = randGenerator.nextInt();
@@ -234,7 +233,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
new file mode 100644
index 0000000..0a43a96
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -0,0 +1,501 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.NullWritable;
+
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class P5ForPathMergeVertex extends
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "P5ForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "P5ForPathMergeVertex.iteration";
+ public static final String RANDSEED = "P5ForPathMergeVertex.randSeed";
+ public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private static long randSeed = -1;
+ private float probBeingRandomHead = -1;
+ private Random randGenerator;
+
+ private VKmerBytesWritable curID = new VKmerBytesWritable();
+ private VKmerBytesWritable nextID = new VKmerBytesWritable();
+ private VKmerBytesWritable prevID = new VKmerBytesWritable();
+ private boolean hasNext;
+ private boolean hasPrev;
+ private boolean curHead;
+ private boolean nextHead;
+ private boolean prevHead;
+ private byte headFlag;
+ private byte tailFlag;
+ private byte outFlag;
+
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
+ private Iterator<VKmerBytesWritable> posIterator;
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if (randSeed < 0)
+ randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ if (probBeingRandomHead < 0)
+ probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
+ hasNext = false;
+ hasPrev = false;
+ curHead = false;
+ nextHead = false;
+ prevHead = false;
+ outgoingMsg.reset();
+ }
+
+ protected boolean isNodeRandomHead(VKmerBytesWritable nodeID) {
+ // "deterministically random", based on node id
+ randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ return randGenerator.nextFloat() < probBeingRandomHead;
+ }
+
+ /**
+ * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
+ */
+ protected boolean setNextInfo(VertexValueWritable value) {
+ if (value.getFFList().getCountOfPosition() > 0) {
+ nextID.setAsCopy(value.getFFList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ if (value.getFRList().getCountOfPosition() > 0) {
+ nextID.setAsCopy(value.getFRList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
+ */
+ protected boolean setPrevInfo(VertexValueWritable value) {
+ if (value.getRRList().getCountOfPosition() > 0) {
+ prevID.setAsCopy(value.getRRList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ if (value.getRFList().getCountOfPosition() > 0) {
+ prevID.setAsCopy(value.getRFList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ return posIterator.next();
+ }
+
+ public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
+ if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ else // #RRList() > 0
+ posIterator = value.getRRList().iterator();
+ return posIterator.next();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.setAsCopy(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.setAsCopy(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.setAsCopy(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
+ while(posIterator.hasNext()){
+ destVertexId.setAsCopy(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * start sending message
+ */
+ public void startSendMsg() {
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
+ sendMsgToAllNextNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
+ sendMsgToAllPreviousNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setFlag(Message.START);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+ outgoingMsg.setFlag(Message.END);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ }
+
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<MessageWritable> msgIterator) {
+ while (msgIterator.hasNext()) {
+ if (!VertexUtil.isPathVertex(getVertexValue())
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+ msgIterator.next();
+ voteToHalt();
+ } else {
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+
+ /**
+ * set vertex state
+ */
+ public void setState() {
+ if (incomingMsg.getFlag() == Message.START) {
+ getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.IS_HEAD) {
+ getVertexValue().setState(MessageFlag.IS_HEAD);
+ getVertexValue().setActualKmer(getVertexValue().getActualKmer());
+ //voteToHalt();
+ } //else
+ //voteToHalt();
+ }
+
+ /**
+ * check if A need to be flipped with successor
+ */
+ public boolean ifFilpWithSuccessor(){
+ if(getVertexValue().getFRList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * check if A need to be filpped with predecessor
+ */
+ public boolean ifFlipWithPredecessor(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * set adjMessage to successor(from predecessor)
+ */
+ public void setSuccessorAdjMsg(){
+ if(getVertexValue().getFFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_FF;
+ else
+ outFlag |= MessageFlag.DIR_FR;
+ }
+
+ /**
+ * set adjMessage to predecessor(from successor)
+ */
+ public void setPredecessorAdjMsg(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_RF;
+ else
+ outFlag |= MessageFlag.DIR_RF;
+ }
+
+ /**
+ * send update message to neighber
+ * @throws IOException
+ */
+ public void broadcastUpdateMsg(){
+ /* switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }*/
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromPredecessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHNEXT;
+ getVertexValue().setState(state);
+ if(getVertexValue().getFFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromSuccessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHPREV;
+ getVertexValue().setState(state);
+ if(getVertexValue().getRFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /**
+ * check if need filp
+ */
+ public byte flipDirection(byte neighborDir, boolean flip){
+ if(flip){
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+ } else
+ return neighborDir;
+ }
+
+ /**
+ * updateAdjList
+ */
+ public void processUpdate(){
+ /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));*/
+ }
+
+ /**
+ * merge and updateAdjList
+ */
+ public void processMerge(){
+ /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
+ kmerSize, incomingMsg.getKmer());*/
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if (getSuperstep() == 2)
+ initState(msgIterator);
+ else if (getSuperstep() % 4 == 3){
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
+ tailFlag = (byte) (State.IS_HEAD & getVertexValue().getState()); //is_tail
+ outFlag = (byte) (headFlag | tailFlag);
+
+ // only PATH vertices are present. Find the ID's for my neighbors
+ curID.setAsCopy(getVertexId());
+
+ curHead = isNodeRandomHead(curID);
+
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
+ hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
+ getVertexValue().setState(outFlag);
+ voteToHalt();
+ }
+ if (hasNext || hasPrev) {
+ if (curHead) {
+ if (hasNext && !nextHead) {
+ // compress this head to the forward tail
+ sendUpMsgFromPredecessor();
+ } else if (hasPrev && !prevHead) {
+ // compress this head to the reverse tail
+ sendUpMsgFromSuccessor();
+ }
+ } else {
+ // I'm a tail
+ if (hasNext && hasPrev) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ // tails on both sides, and I'm the "local minimum"
+ // compress me towards the tail in forward dir
+ sendUpMsgFromPredecessor();
+ }
+ } else if (!hasPrev) {
+ // no previous node
+ if (!nextHead && curID.compareTo(nextID) < 0) {
+ // merge towards tail in forward dir
+ sendUpMsgFromPredecessor();
+ }
+ } else if (!hasNext) {
+ // no next node
+ if (!prevHead && curID.compareTo(prevID) < 0) {
+ // merge towards tail in reverse dir
+ sendUpMsgFromSuccessor();
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(P5ForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P5ForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index fa50e66..83958fe 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -14,9 +14,8 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -25,7 +24,6 @@
public class SplitRepeatVertex extends
BasicGraphCleanVertex{
-
public class EdgeDir{
public static final byte DIR_FF = 0 << 0;
public static final byte DIR_FR = 1 << 0;
@@ -35,11 +33,11 @@
public class DeletedEdge{
private byte dir;
- private KmerBytesWritable edge;
+ private VKmerBytesWritable edge;
public DeletedEdge(){
dir = 0;
- edge = new KmerBytesWritable(kmerSize);
+ edge = new VKmerBytesWritable(kmerSize);
}
public byte getDir() {
@@ -50,12 +48,12 @@
this.dir = dir;
}
- public KmerBytesWritable getEdge() {
+ public VKmerBytesWritable getEdge() {
return edge;
}
- public void setEdge(KmerBytesWritable edge) {
- this.edge.set(edge);
+ public void setEdge(VKmerBytesWritable edge) {
+ this.edge.setAsCopy(edge);
}
}
@@ -71,13 +69,13 @@
private Set<Long> outgoingReadIdSet = new HashSet<Long>();
private Set<Long> selfReadIdSet = new HashSet<Long>();
private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
- private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
+ private Map<VKmerBytesWritable, Set<Long>> kmerMap = new HashMap<VKmerBytesWritable, Set<Long>>();
private VKmerListWritable incomingEdgeList = null;
private VKmerListWritable outgoingEdgeList = null;
private byte incomingEdgeDir = 0;
private byte outgoingEdgeDir = 0;
- protected KmerBytesWritable createdVertexId = null;
+ protected VKmerBytesWritable createdVertexId = null;
/**
* initiate kmerSize, maxIteration
@@ -94,13 +92,13 @@
else
outgoingMsg.reset(kmerSize);
if(incomingEdgeList == null)
- incomingEdgeList = new VKmerListWritable(kmerSize);
+ incomingEdgeList = new VKmerListWritable();
if(outgoingEdgeList == null)
- outgoingEdgeList = new VKmerListWritable(kmerSize);
+ outgoingEdgeList = new VKmerListWritable();
if(createdVertexId == null)
- createdVertexId = new KmerBytesWritable(kmerSize);//kmerSize + 1
+ createdVertexId = new VKmerBytesWritable(kmerSize);//kmerSize + 1
if(destVertexId == null)
- destVertexId = new KmerBytesWritable(kmerSize);
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
/**
@@ -154,11 +152,11 @@
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- public void createNewVertex(int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ public void createNewVertex(int i, VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ VKmerBytesWritable vertexId = new VKmerBytesWritable(kmerSize);
VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
//add the corresponding edge to new vertex
switch(connectedTable[i][0]){
@@ -177,27 +175,27 @@
vertexValue.getFRList().append(outgoingEdge);
break;
}
- vertexId.set(createdVertexId);
+ vertexId.setAsCopy(createdVertexId);
vertex.setVertexId(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
}
- public void sendMsgToUpdateEdge(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ public void sendMsgToUpdateEdge(VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
outgoingMsg.setCreatedVertexId(createdVertexId);
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setFlag(incomingEdgeDir);
- destVertexId.set(incomingEdge);
+ destVertexId.setAsCopy(incomingEdge);
sendMsg(destVertexId, outgoingMsg);
outgoingMsg.setFlag(outgoingEdgeDir);
- destVertexId.set(outgoingEdge);
+ destVertexId.setAsCopy(outgoingEdge);
sendMsg(destVertexId, outgoingMsg);
}
- public void storeDeletedEdge(Set<DeletedEdge> deletedEdges, int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ public void storeDeletedEdge(Set<DeletedEdge> deletedEdges, int i, VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
DeletedEdge deletedIncomingEdge = new DeletedEdge();
DeletedEdge deletedOutgoingEdge = new DeletedEdge();
switch(connectedTable[i][0]){
@@ -243,32 +241,32 @@
public void setEdgeListAndEdgeDir(int i){
switch(connectedTable[i][0]){
case EdgeDir.DIR_RF:
- incomingEdgeList.set(getVertexValue().getRFList());
+ incomingEdgeList.setCopy(getVertexValue().getRFList());
incomingEdgeDir = MessageFlag.DIR_RF;
break;
case EdgeDir.DIR_RR:
- incomingEdgeList.set(getVertexValue().getRRList());
+ incomingEdgeList.setCopy(getVertexValue().getRRList());
incomingEdgeDir = MessageFlag.DIR_RR;
break;
}
switch(connectedTable[i][1]){
case EdgeDir.DIR_FF:
- outgoingEdgeList.set(getVertexValue().getFFList());
+ outgoingEdgeList.setCopy(getVertexValue().getFFList());
outgoingEdgeDir = MessageFlag.DIR_FF;
break;
case EdgeDir.DIR_FR:
- outgoingEdgeList.set(getVertexValue().getFRList());
+ outgoingEdgeList.setCopy(getVertexValue().getFRList());
outgoingEdgeDir = MessageFlag.DIR_FR;
break;
}
}
- public void setNeighborEdgeIntersection(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ public void setNeighborEdgeIntersection(VKmerBytesWritable incomingEdge, VKmerBytesWritable outgoingEdge){
outgoingReadIdSet.clear();
incomingReadIdSet.clear();
- tmpKmer.set(incomingEdge);
+ tmpKmer.setAsCopy(incomingEdge);
incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
- tmpKmer.set(outgoingEdge);
+ tmpKmer.setAsCopy(outgoingEdge);
outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
//set all neighberEdge readId intersection
@@ -332,20 +330,20 @@
/** set edgeList and edgeDir based on connectedTable **/
setEdgeListAndEdgeDir(i);
- KmerBytesWritable incomingEdge = new KmerBytesWritable(kmerSize);
- KmerBytesWritable outgoingEdge = new KmerBytesWritable(kmerSize);
+ VKmerBytesWritable incomingEdge = new VKmerBytesWritable(kmerSize);
+ VKmerBytesWritable outgoingEdge = new VKmerBytesWritable(kmerSize);
for(int x = 0; x < incomingEdgeList.getCountOfPosition(); x++){
for(int y = 0; y < outgoingEdgeList.getCountOfPosition(); y++){
- incomingEdge.set(incomingEdgeList.getPosition(x));
- outgoingEdge.set(outgoingEdgeList.getPosition(y));
+ incomingEdge.setAsCopy(incomingEdgeList.getPosition(x));
+ outgoingEdge.setAsCopy(outgoingEdgeList.getPosition(y));
/** set neighborEdge readId intersection **/
setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
if(!neighborEdgeIntersection.isEmpty()){
if(count == 0)
- createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+ createdVertexId.setByRead("AAA".length(), "AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
else
- createdVertexId.setByRead("GGG".getBytes(), 0);
+ createdVertexId.setByRead("GGG".length(), "GGG".getBytes(), 0);
count++;
/** create new/created vertex **/
@@ -413,7 +411,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 7b695dc..0738208 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -3,8 +3,8 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -46,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class TipAddVertex extends
- Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "TipAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -68,8 +68,8 @@
initVertex();
if(getSuperstep() == 1){
if(getVertexId().toString().equals("CTA")){
- KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
- vertexId.setByRead("AGC".getBytes(), 0);
+ VKmerBytesWritable vertexId = new VKmerBytesWritable(kmerSize);
+ vertexId.setByRead(kmerSize, "AGC".getBytes(), 0);
getVertexValue().getRFList().append(vertexId);
//add tip vertex
@@ -86,10 +86,10 @@
/**
* set the vertex value
*/
- KmerListWritable kmerList = new KmerListWritable(kmerSize);
+ VKmerListWritable kmerList = new VKmerListWritable();
kmerList.append(getVertexId());
vertexValue.setRFList(kmerList);
- vertexValue.setKmer(vertexId);
+ vertexValue.setActualKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
@@ -107,7 +107,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index c8d3e2d..527fb66 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -3,7 +3,6 @@
import java.util.Iterator;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -11,7 +10,7 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -64,7 +63,7 @@
else
outgoingMsg.reset(kmerSize);
if(destVertexId == null)
- destVertexId = new KmerBytesWritable(kmerSize);
+ destVertexId = new VKmerBytesWritable(kmerSize);
}
@Override
@@ -73,7 +72,6 @@
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
-
sendSettledMsgToPreviousNode();
deleteVertex(getVertexId());
}
@@ -107,7 +105,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
new file mode 100644
index 0000000..4e728e2
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -0,0 +1,58 @@
+package edu.uci.ics.genomix.pregelix.sequencefile;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+
+
+public class ConvertNodeToIdValue {
+
+ public static void convert(Path inFile, Path outFile)
+ throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, PositionWritable.class,
+ VertexValueWritable.class, CompressionType.NONE);
+ NodeWritable node = new NodeWritable();
+ NullWritable value = NullWritable.get();
+ PositionWritable outputKey = new PositionWritable();
+ VertexValueWritable outputValue = new VertexValueWritable();
+
+ while(reader.next(node, value)) {
+// System.out.println(node.getNodeID().toString());
+// outputKey.set(node.getNodeID());
+ outputValue.setFFList(node.getFFList());
+ outputValue.setFRList(node.getFRList());
+ outputValue.setRFList(node.getRFList());
+ outputValue.setRRList(node.getRRList());
+ outputValue.setActualKmer(node.getKmer());
+ outputValue.setState(State.IS_HEAD);
+ writer.append(outputKey, outputValue);
+ }
+ writer.close();
+ reader.close();
+ }
+
+ public static void main(String[] args) throws IOException {
+ Path dir = new Path("data/test");
+ Path outDir = new Path("data/input");
+ FileUtils.cleanDirectory(new File("data/input"));
+ Path inFile = new Path(dir, "result.graphbuild.txt.bin");
+ Path outFile = new Path(outDir, "out");
+ convert(inFile,outFile);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 8618237..b3a3d37 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -14,12 +14,11 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class GenerateTextFile {
public static void generateFromPathmergeResult(int kmerSize, String strSrcDir, String outPutDir) throws IOException {
- KmerBytesWritable.setGlobalKmerLength(kmerSize);
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.getLocal(conf);
@@ -28,7 +27,7 @@
File srcPath = new File(strSrcDir);
for (File f : srcPath.listFiles((FilenameFilter) (new WildcardFileFilter("part*")))) {
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(f.getAbsolutePath()), conf);
- KmerBytesWritable key = new KmerBytesWritable();
+ VKmerBytesWritable key = new VKmerBytesWritable();
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
@@ -45,14 +44,13 @@
}
public static void generateSpecificLengthChainFromNaivePathmergeResult(int maxLength) throws IOException {
- KmerBytesWritable.setGlobalKmerLength(55);
BufferedWriter bw = new BufferedWriter(new FileWriter("naive_text_" + maxLength));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
for (int i = 0; i < 2; i++) {
Path path = new Path("/home/anbangx/genomix_result/final_naive/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable();
+ VKmerBytesWritable key = new VKmerBytesWritable();
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
@@ -70,14 +68,13 @@
}
public static void generateSpecificLengthChainFromLogPathmergeResult(int maxLength) throws IOException {
- KmerBytesWritable.setGlobalKmerLength(55);
BufferedWriter bw = new BufferedWriter(new FileWriter("log_text_" + maxLength));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
for (int i = 0; i < 2; i++) {
Path path = new Path("/home/anbangx/genomix_result/improvelog2/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable();
+ VKmerBytesWritable key = new VKmerBytesWritable();
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
@@ -96,13 +93,12 @@
}
public static void generateFromGraphbuildResult() throws IOException {
- KmerBytesWritable.setGlobalKmerLength(55);
BufferedWriter bw = new BufferedWriter(new FileWriter("textfile"));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
Path path = new Path("data/input/part-0-out-3000000");
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable();
+ VKmerBytesWritable key = new VKmerBytesWritable();
while (reader.next(key, null)) {
if (key == null) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 5a0591d..4582557 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -2,7 +2,7 @@
import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class VertexUtil {
/**
@@ -67,7 +67,7 @@
/**
* check if mergeChain is cycle
*/
- public static boolean isCycle(KmerBytesWritable kmer, KmerBytesWritable mergeChain, int kmerSize) {
+ public static boolean isCycle(VKmerBytesWritable kmer, VKmerBytesWritable mergeChain, int kmerSize) {
String chain = mergeChain.toString().substring(1);
return chain.contains(kmer.toString());
@@ -116,7 +116,7 @@
/**
* get nodeId from Ad
*/
- public static KmerBytesWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
+ public static VKmerBytesWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
if(adj.getForwardList().getCountOfPosition() > 0)
return adj.getForwardList().getPosition(0);
else if(adj.getReverseList().getCountOfPosition() > 0)
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index ff5f404..2ef9958 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -12,16 +12,14 @@
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
-import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.splitrepeat.SplitRepeatVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
public class JobGenerator {
@@ -51,7 +49,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(P2ForPathMergeVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -103,7 +101,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(MapReduceVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -119,7 +117,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(SplitRepeatVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -134,7 +132,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -151,7 +149,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -168,7 +166,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -185,7 +183,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -202,7 +200,7 @@
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
index e138348..70ac94f 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
+import edu.uci.ics.genomix.pregelix.graph.GenerateGraphViz;
import edu.uci.ics.genomix.pregelix.sequencefile.GenerateTextFile;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
@@ -33,12 +34,13 @@
private final PregelixJob job;
private final String resultFileDir;
private final String textFileDir;
+ private final String graphvizFileDir;
private final String jobFile;
private final Driver driver = new Driver(this.getClass());
private final FileSystem dfs;
public BasicSmallTestCase(String hadoopConfPath, String jobName, String jobFile, FileSystem dfs,
- String hdfsInput, String resultFile, String textFile) throws Exception {
+ String hdfsInput, String resultFile, String textFile, String graphvizFile) throws Exception {
super("test");
this.jobFile = jobFile;
this.job = new PregelixJob("test");
@@ -46,9 +48,10 @@
this.job.getConfiguration().addResource(new Path(hadoopConfPath));
FileInputFormat.setInputPaths(job, hdfsInput);
FileOutputFormat.setOutputPath(job, new Path(hdfsInput + "_result"));
- this.textFileDir = textFile;
job.setJobName(jobName);
this.resultFileDir = resultFile;
+ this.textFileDir = textFile;
+ this.graphvizFileDir = graphvizFile;
this.dfs = dfs;
}
@@ -75,6 +78,7 @@
private void compareResults() throws Exception {
dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(resultFileDir));
GenerateTextFile.generateFromPathmergeResult(3, resultFileDir, textFileDir);
+ GenerateGraphViz.convertGraphCleanOutputToGraphViz(resultFileDir, graphvizFileDir);
}
public String toString() {
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
index c4f0963..c492d1e 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
@@ -143,9 +143,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
index 171a6ca..8d22e9a 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
@@ -143,9 +143,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
index a9c2774..1e5df13 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
@@ -143,9 +143,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java
index a94d31c..57647ab 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java
@@ -143,9 +143,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 115f090..7b937c8 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -150,9 +150,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
index 1948acd..43f4788 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
@@ -143,9 +143,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
index 57f4ea5..b2d02c8 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
@@ -143,9 +143,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
index e8ca43f..b4514d6 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
@@ -143,9 +143,11 @@
+ File.separator + "bin" + File.separator + testDir.getName();
String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ File.separator + "txt" + File.separator + testDir.getName();
+ String graphvizFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "graphviz" + File.separator + testDir.getName();
testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
.getAbsolutePath().toString(), dfs,
- HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName, graphvizFileName));
}
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index 79439a3..0d609bf 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -41,7 +41,7 @@
import edu.uci.ics.genomix.hyracks.driver.Driver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
@@ -175,7 +175,7 @@
}
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- NodeWritable node = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
+ NodeWritable node = new NodeWritable();
NullWritable value = NullWritable.get();
while (reader.next(node, value)) {
if (node == null) {
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
index e8ca60f..9daeaa3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
@@ -14,7 +14,7 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.State2;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class MergePathTest {
public static final String PATH_TO_TESTSTORE = "testcase/pathmerge/";
@@ -50,7 +50,7 @@
for (int i = 0; i < nc; i++) {
Path path = new Path(input + "/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(kmerSize);
+ VKmerBytesWritable key = new VKmerBytesWritable(kmerSize);
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
@@ -90,7 +90,7 @@
for (int i = 0; i < nc; i++) {
Path path = new Path(input + "/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(kmerSize);
+ VKmerBytesWritable key = new VKmerBytesWritable(kmerSize);
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
@@ -115,7 +115,7 @@
for (int i = 0; i < nc; i++) {
Path path = new Path(input + "/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(kmerSize);
+ VKmerBytesWritable key = new VKmerBytesWritable(kmerSize);
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {