Merge commit '164cc8091571ad11261a91eed68e06d2e73ea490' into nanzhang/hyracks_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
new file mode 100644
index 0000000..5b10fdc
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -0,0 +1,265 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+
+public class KmerListWritable extends BinaryComparable
+ implements Writable, Iterable<KmerBytesWritable>, Serializable{
+ private static final long serialVersionUID = 1L;
+ protected static final byte[] EMPTY_BYTES = { 0, 0, 0, 0 };
+ protected static final int HEADER_SIZE = 4;
+
+ protected byte[] storage;
+ protected int offset;
+ protected int valueCount;
+ protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
+
+ private KmerBytesWritable posIter = new KmerBytesWritable();
+
+ public KmerListWritable() {
+ storage = EMPTY_BYTES;
+ valueCount = 0;
+ offset = 0;
+ storageMaxSize = storage.length;
+ }
+
+ public KmerListWritable(byte[] data, int offset) {
+ setNewReference(data, offset);
+ }
+
+ public KmerListWritable(List<KmerBytesWritable> kmers) {
+ this();
+ setSize(kmers.size() * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE); // reserve space for all elements
+ for (KmerBytesWritable kmer : kmers) {
+ append(kmer);
+ }
+ }
+
+ public void setNewReference(byte[] data, int offset) {
+ valueCount = Marshal.getInt(data, offset);
+ if (valueCount * KmerBytesWritable.getBytesPerKmer() > data.length - offset) {
+ throw new IllegalArgumentException("Specified data buffer (len=" + (data.length - offset)
+ + ") is not large enough to store requested number of elements (" + valueCount + ")!");
+ }
+ this.storage = data;
+ this.offset = offset;
+ this.storageMaxSize = valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
+ }
+
+ public void append(KmerBytesWritable kmer) {
+ setSize((1 + valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ System.arraycopy(kmer.getBytes(), kmer.offset, storage,
+ offset + HEADER_SIZE + valueCount * KmerBytesWritable.getBytesPerKmer(),
+ KmerBytesWritable.getBytesPerKmer());
+ valueCount += 1;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ /*
+ * Append the otherList to the end of myList
+ */
+ public void appendList(KmerListWritable otherList) {
+ if (otherList.valueCount > 0) {
+ setSize((valueCount + otherList.valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ // copy contents of otherList into the end of my storage
+ System.arraycopy(otherList.storage, otherList.offset + HEADER_SIZE, storage, offset + HEADER_SIZE
+ + valueCount * KmerBytesWritable.getBytesPerKmer(),
+ otherList.valueCount * KmerBytesWritable.getBytesPerKmer());
+ valueCount += otherList.valueCount;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+ }
+
+ /**
+ * Save the union of my list and otherList. Uses a temporary HashSet for
+ * uniquefication
+ */
+ public void unionUpdate(KmerListWritable otherList) {
+ int newSize = valueCount + otherList.valueCount;
+ HashSet<KmerBytesWritable> uniqueElements = new HashSet<KmerBytesWritable>(newSize);
+ for (KmerBytesWritable kmer : this) {
+ uniqueElements.add(kmer);
+ }
+ for (KmerBytesWritable kmer : otherList) {
+ uniqueElements.add(kmer);
+ }
+ valueCount = 0;
+ setSize(newSize * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ for (KmerBytesWritable kmer : uniqueElements) { // this point is not efficient
+ append(kmer);
+ }
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ }
+
+ protected int getCapacity() {
+ return storageMaxSize - offset;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap > getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (valueCount > 0) {
+ System.arraycopy(storage, offset, new_data, 0, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ }
+ storage = new_data;
+ offset = 0;
+ storageMaxSize = storage.length;
+ }
+ }
+
+ public void reset() {
+ valueCount = 0;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ public KmerBytesWritable getPosition(int i) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ posIter.setAsReference(storage, offset + HEADER_SIZE + i * KmerBytesWritable.getBytesPerKmer());
+ return posIter;
+ }
+
+ public void setCopy(KmerListWritable otherList) {
+ setCopy(otherList.storage, otherList.offset);
+ }
+
+ /**
+ * read a KmerListWritable from newData, which should include the header
+ */
+ public void setCopy(byte[] newData, int offset) {
+ int newValueCount = Marshal.getInt(newData, offset);
+ setSize(newValueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ if (newValueCount > 0) {
+ System.arraycopy(newData, offset + HEADER_SIZE, storage, this.offset + HEADER_SIZE, newValueCount
+ * KmerBytesWritable.getBytesPerKmer());
+ }
+ valueCount = newValueCount;
+ Marshal.putInt(valueCount, storage, this.offset);
+ }
+
+ @Override
+ public Iterator<KmerBytesWritable> iterator() {
+ Iterator<KmerBytesWritable> it = new Iterator<KmerBytesWritable>() {
+
+ private int currentIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < valueCount;
+ }
+
+ @Override
+ public KmerBytesWritable next() {
+ return getPosition(currentIndex++);
+ }
+
+ @Override
+ public void remove() {
+ if (currentIndex < valueCount)
+ System.arraycopy(storage, offset + currentIndex * KmerBytesWritable.getBytesPerKmer(), storage,
+ offset + (currentIndex - 1) * KmerBytesWritable.getBytesPerKmer(),
+ (valueCount - currentIndex) * KmerBytesWritable.getBytesPerKmer());
+ valueCount--;
+ currentIndex--;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+ };
+ return it;
+ }
+
+ /*
+ * remove the first instance of `toRemove`. Uses a linear scan. Throws an
+ * exception if not in this list.
+ */
+ public void remove(KmerBytesWritable toRemove, boolean ignoreMissing) {
+ Iterator<KmerBytesWritable> posIterator = this.iterator();
+ while (posIterator.hasNext()) {
+ if (toRemove.equals(posIterator.next())) {
+ posIterator.remove();
+ return; // break as soon as the element is found
+ }
+ }
+ // element was not found
+ if (!ignoreMissing) {
+ throw new ArrayIndexOutOfBoundsException("the KmerBytesWritable `" + toRemove.toString()
+ + "` was not found in this list.");
+ }
+ }
+
+ public void remove(KmerBytesWritable toRemove) {
+ remove(toRemove, false);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ valueCount = in.readInt();
+ setSize(valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ in.readFully(storage, offset + HEADER_SIZE, valueCount * KmerBytesWritable.getBytesPerKmer() - HEADER_SIZE);
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(storage, offset, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ }
+
+ public int getCountOfPosition() {
+ return valueCount;
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
+ }
+ @Override
+ public String toString() {
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('[');
+ for (int i = 0; i < valueCount; i++) {
+ sbuilder.append(getPosition(i).toString());
+ sbuilder.append(',');
+ }
+ if (valueCount > 0) {
+ sbuilder.setCharAt(sbuilder.length() - 1, ']');
+ } else {
+ sbuilder.append(']');
+ }
+ return sbuilder.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+ }
+
+ @Override
+ public byte[] getBytes() {
+
+ return null;
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 35e55d5..f69f122 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -80,7 +80,6 @@
averageCoverage = 0;
}
-
public PositionListWritable getNodeIdList() {
return nodeIdList;
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 8de4b0e..881cbd6 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -92,7 +92,7 @@
int newSize = valueCount + otherList.valueCount;
HashSet<PositionWritable> uniqueElements = new HashSet<PositionWritable>(newSize);
for (PositionWritable pos : this) {
- uniqueElements.add(pos);
+ uniqueElements.add(new PositionWritable(pos));
}
for (PositionWritable pos : otherList) {
uniqueElements.add(pos);
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 03d66a6..bcdd423 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -32,6 +32,10 @@
set(mateId, readId, posId);
}
+ public PositionWritable(PositionWritable other) {
+ this();
+ set(other);
+ }
public PositionWritable(byte[] storage, int offset) {
setNewReference(storage, offset);
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index 4f34542..7e516fd 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -155,6 +155,7 @@
System.arraycopy(newData, offset + HEADER_SIZE, bytes, this.kmerStartOffset, bytesUsed);
}
+
/**
* Point this datablock to the given bytes array It works like the pointer
* to new datablock.
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
index aa33350..f269b5a 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
@@ -24,7 +24,7 @@
protected byte[] storage;
protected int offset;
protected int valueCount;
- protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
+ protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
private VKmerBytesWritable posIter = new VKmerBytesWritable();
@@ -32,7 +32,7 @@
storage = EMPTY_BYTES;
valueCount = 0;
offset = 0;
- storageMaxSize = storage.length;
+ storageMaxSize = storage.length;
}
public VKmerListWritable(byte[] data, int offset) {
@@ -55,8 +55,16 @@
public void append(VKmerBytesWritable kmer) {
setSize(getLength() + kmer.getLength());
- System.arraycopy(kmer.getBytes(), kmer.kmerStartOffset - VKmerBytesWritable.HEADER_SIZE,
- storage, offset + getLength(),
+ System.arraycopy(kmer.getBytes(), kmer.kmerStartOffset - VKmerBytesWritable.HEADER_SIZE, storage, offset
+ + getLength(), kmer.getLength());
+ valueCount += 1;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ public void append(int k, KmerBytesWritable kmer) {
+ setSize(getLength() + HEADER_SIZE + kmer.getLength());
+ Marshal.putInt(k, storage, offset + getLength());
+ System.arraycopy(kmer.getBytes(), kmer.getOffset(), storage, offset + getLength() + HEADER_SIZE,
kmer.getLength());
valueCount += 1;
Marshal.putInt(valueCount, storage, offset);
@@ -79,7 +87,7 @@
public void appendList(VKmerListWritable otherList) {
if (otherList.valueCount > 0) {
setSize(getLength() + otherList.getLength() - HEADER_SIZE); // one of the headers is redundant
-
+
// copy contents of otherList into the end of my storage
System.arraycopy(otherList.storage, otherList.offset + HEADER_SIZE, // skip other header
storage, offset + getLength(), // add to end
@@ -103,7 +111,7 @@
for (VKmerBytesWritable kmer : otherList) {
uniqueElements.add(kmer); // references okay
}
- setSize(getLength() + otherList.getLength()); // upper bound on memory usage
+ setSize(getLength() + otherList.getLength()); // upper bound on memory usage
valueCount = 0;
for (VKmerBytesWritable kmer : uniqueElements) {
append(kmer);
@@ -142,9 +150,9 @@
posIter.setAsReference(storage, getOffsetOfKmer(i));
return posIter;
}
-
+
/**
- * Return the offset of the kmer at the i'th position
+ * Return the offset of the kmer at the i'th position
*/
public int getOffsetOfKmer(int i) {
if (i >= valueCount) {
@@ -170,9 +178,8 @@
int newLength = getLength(newData, newOffset);
setSize(newLength);
if (newValueCount > 0) {
- System.arraycopy(newData, newOffset + HEADER_SIZE,
- storage, this.offset + HEADER_SIZE,
- newLength - HEADER_SIZE);
+ System.arraycopy(newData, newOffset + HEADER_SIZE, storage, this.offset + HEADER_SIZE, newLength
+ - HEADER_SIZE);
}
valueCount = newValueCount;
Marshal.putInt(valueCount, storage, this.offset);
@@ -193,7 +200,8 @@
@Override
public VKmerBytesWritable next() {
posIter.setAsReference(storage, currentOffset);
- currentOffset += KmerUtil.getByteNumFromK(Marshal.getInt(storage, currentOffset)) + VKmerBytesWritable.HEADER_SIZE;
+ currentOffset += KmerUtil.getByteNumFromK(Marshal.getInt(storage, currentOffset))
+ + VKmerBytesWritable.HEADER_SIZE;
currentIndex++;
return posIter;
}
@@ -201,16 +209,17 @@
@Override
public void remove() {
if (currentOffset <= 0) {
- throw new IllegalStateException("You must advance the iterator using .next() before calling remove()!");
+ throw new IllegalStateException(
+ "You must advance the iterator using .next() before calling remove()!");
}
// we're removing the element at prevIndex
int prevIndex = currentIndex - 1;
int prevOffset = getOffsetOfKmer(prevIndex);
-
+
if (currentIndex < valueCount) { // if it's the last element, don't have to do any copying
System.arraycopy(storage, currentOffset, // from the "next" element
storage, prevOffset, // to the one just returned (overwriting it)
- getLength() - currentOffset + offset); // remaining bytes except current element
+ getLength() - currentOffset + offset); // remaining bytes except current element
}
valueCount--;
currentIndex--;
@@ -230,7 +239,7 @@
while (posIterator.hasNext()) {
if (toRemove.equals(posIterator.next())) {
posIterator.remove();
- return; // break as soon as the element is found
+ return; // break as soon as the element is found
}
}
// element was not found
@@ -258,7 +267,8 @@
elemBytes = KmerUtil.getByteNumFromK(elemLetters) + VKmerBytesWritable.HEADER_SIZE;
setSize(curLength + elemBytes); // make sure we have room for the new element
Marshal.putInt(elemLetters, storage, curOffset); // write header
- in.readFully(storage, curOffset + VKmerBytesWritable.HEADER_SIZE, elemBytes - VKmerBytesWritable.HEADER_SIZE); // write kmer
+ in.readFully(storage, curOffset + VKmerBytesWritable.HEADER_SIZE, elemBytes
+ - VKmerBytesWritable.HEADER_SIZE); // write kmer
curOffset += elemBytes;
curLength += elemBytes;
}
@@ -284,19 +294,21 @@
public int getLength() {
int totalSize = HEADER_SIZE;
for (int curCount = 0; curCount < valueCount; curCount++) {
- totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(storage, offset + totalSize)) + VKmerBytesWritable.HEADER_SIZE;
+ totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(storage, offset + totalSize))
+ + VKmerBytesWritable.HEADER_SIZE;
}
return totalSize;
}
-
+
public static int getLength(byte[] listStorage, int listOffset) {
- int totalSize = HEADER_SIZE;
- int listValueCount = Marshal.getInt(listStorage, listOffset);
- for (int curCount = 0; curCount < listValueCount; curCount++) {
- totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(listStorage, listOffset + totalSize)) + VKmerBytesWritable.HEADER_SIZE;
- }
- return totalSize;
- }
+ int totalSize = HEADER_SIZE;
+ int listValueCount = Marshal.getInt(listStorage, listOffset);
+ for (int curCount = 0; curCount < listValueCount; curCount++) {
+ totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(listStorage, listOffset + totalSize))
+ + VKmerBytesWritable.HEADER_SIZE;
+ }
+ return totalSize;
+ }
@Override
public String toString() {
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 81565a3..efeff71 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -26,6 +26,7 @@
import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
public class KmerBytesWritableTest {
static byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
@@ -124,4 +125,312 @@
Assert.assertEquals(kmer.toString(), kmerAppend.toString());
}
}
+
+
+ @Test
+ public void TestMergeFFKmer() {
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ String text = "AGCTGACCGT";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(8);
+ kmer1.setByRead(array, 0);
+ String text1 = "AGCTGACC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(8);
+ kmer2.setByRead(array, 1);
+ String text2 = "GCTGACCG";
+ Assert.assertEquals(text2, kmer2.toString());
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ int kmerSize = 8;
+ merge.mergeWithFFKmer(kmerSize, kmer2);
+ Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
+
+ for (int i = 1; i < 8; i++) {
+ merge.set(kmer1);
+ merge.mergeWithFFKmer(i, kmer2);
+ Assert.assertEquals(text1 + text2.substring(i - 1), merge.toString());
+ }
+
+ for (int ik = 1; ik <= 10; ik++) {
+ for (int jk = 1; jk <= 10; jk++) {
+ kmer1 = new KmerBytesWritable(ik);
+ kmer2 = new KmerBytesWritable(jk);
+ kmer1.setByRead(array, 0);
+ kmer2.setByRead(array, 0);
+ text1 = text.substring(0, ik);
+ text2 = text.substring(0, jk);
+ Assert.assertEquals(text1, kmer1.toString());
+ Assert.assertEquals(text2, kmer2.toString());
+ for (int x = 1; x < jk; x++) {
+ merge.set(kmer1);
+ merge.mergeWithFFKmer(x, kmer2);
+ Assert.assertEquals(text1 + text2.substring(x - 1), merge.toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void TestMergeFRKmer() {
+ int kmerSize = 3;
+ String result = "AAGCTAACAACC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AAGCTAA";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 0);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "GGTTGTT";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, result.length() - text2.length());
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithFRKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAAACAACC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAACAACC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAACAACC", merge.toString());
+ }
+
+
+ @Test
+ public void TestMergeRFKmer() {
+ int kmerSize = 3;
+ String result = "GGCACAACAACCC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AACAACCC";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 5);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "TTGTGCC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, 0);
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithRFKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAAACAACCC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAACAACCC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAACAACCC", merge.toString());
+
+ String test1;
+ String test2;
+ test1 = "CTA";
+ test2 = "AGA";
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("TCTA", k1.toString());
+
+ test1 = "CTA";
+ test2 = "ATA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("CTAT", k1.toString());
+
+ test1 = "ATA";
+ test2 = "CTA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("ATAG", k1.toString());
+
+ test1 = "TCTAT";
+ test2 = "GAAC";
+ k1 = new KmerBytesWritable(5);
+ k2 = new KmerBytesWritable(4);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithRFKmer(3, k2);
+ Assert.assertEquals("GTTCTAT", k1.toString());
+ }
+
+
+
+ @Test
+ public void TestMergeRRKmer() {
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ String text = "AGCTGACCGT";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(8);
+ kmer1.setByRead(array, 0);
+ String text1 = "AGCTGACC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(8);
+ kmer2.setByRead(array, 1);
+ String text2 = "GCTGACCG";
+ Assert.assertEquals(text2, kmer2.toString());
+ KmerBytesWritable merge = new KmerBytesWritable(kmer2);
+ int kmerSize = 8;
+ merge.mergeWithRRKmer(kmerSize, kmer1);
+ Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
+
+ for (int i = 1; i < 8; i++) {
+ merge.set(kmer2);
+ merge.mergeWithRRKmer(i, kmer1);
+ Assert.assertEquals(text1.substring(0, text1.length() - i + 1) + text2, merge.toString());
+ }
+
+ for (int ik = 1; ik <= 10; ik++) {
+ for (int jk = 1; jk <= 10; jk++) {
+ kmer1 = new KmerBytesWritable(ik);
+ kmer2 = new KmerBytesWritable(jk);
+ kmer1.setByRead(array, 0);
+ kmer2.setByRead(array, 0);
+ text1 = text.substring(0, ik);
+ text2 = text.substring(0, jk);
+ Assert.assertEquals(text1, kmer1.toString());
+ Assert.assertEquals(text2, kmer2.toString());
+ for (int x = 1; x < ik; x++) {
+ merge.set(kmer2);
+ merge.mergeWithRRKmer(x, kmer1);
+ Assert.assertEquals(text1.substring(0, text1.length() - x + 1) + text2, merge.toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void TestFinalMerge() {
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ int kmerSize = 3;
+
+ String F1 = "AATAG";
+ String F2 = "TAGAA";
+ String R1 = "CTATT";
+ String R2 = "TTCTA";
+
+ //FF test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = F2;
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //FR test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(R2);
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RF test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(F2);
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RR test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = R2;
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ String[][] connectedTable = new String[][]{
+ {"FF", "RF"},
+ {"FF", "RR"},
+ {"FR", "RF"},
+ {"FR", "RR"}
+ };
+ System.out.println(connectedTable[0][1]);
+
+ Set<Long> s1 = new HashSet<Long>();
+ Set<Long> s2 = new HashSet<Long>();
+ s1.add((long) 1);
+ s1.add((long) 2);
+ s2.add((long) 2);
+ s2.add((long) 3);
+ Set<Long> intersection = new HashSet<Long>();
+ intersection.addAll(s1);
+ intersection.retainAll(s2);
+ System.out.println(intersection.toString());
+ Set<Long> difference = new HashSet<Long>();
+ difference.addAll(s1);
+ difference.removeAll(s2);
+ System.out.println(difference.toString());
+
+ Map<KmerBytesWritable, Set<Long>> map = new HashMap<KmerBytesWritable, Set<Long>>();
+ KmerBytesWritable k1 = new KmerBytesWritable(3);
+ Set<Long> set1 = new HashSet<Long>();
+ k1.setByRead(("CTA").getBytes(), 0);
+ set1.add((long)1);
+ map.put(k1, set1);
+ KmerBytesWritable k2 = new KmerBytesWritable(3);
+ k2.setByRead(("GTA").getBytes(), 0);
+ Set<Long> set2 = new HashSet<Long>();
+ set2.add((long) 2);
+ map.put(k2, set2);
+ KmerBytesWritable k3 = new KmerBytesWritable(3);
+ k3.setByRead(("ATG").getBytes(), 0);
+ Set<Long> set3 = new HashSet<Long>();
+ set3.add((long) 2);
+ map.put(k3, set3);
+ KmerBytesWritable k4 = new KmerBytesWritable(3);
+ k4.setByRead(("AAT").getBytes(), 0);
+ Set<Long> set4 = new HashSet<Long>();
+ set4.add((long) 1);
+ map.put(k4, set4);
+ KmerListWritable kmerList = new KmerListWritable(3);
+ kmerList.append(k1);
+ kmerList.append(k2);
+ System.out.println("CTA = " + map.get(k1).toString());
+ System.out.println("GTA = " + map.get(k2).toString());
+ System.out.println("ATG = " + map.get(k3).toString());
+ System.out.println("AAT = " + map.get(k4).toString());
+ System.out.println(k1.compareTo(k2));
+ System.out.println(k2.compareTo(k1));
+
+ System.out.println("CTA = " + kmerList.getPosition(0).toString());
+ System.out.println("GTA = " + kmerList.getPosition(1).toString());
+ System.out.println("CTA = " + map.get(kmerList.getPosition(0)).toString());
+ System.out.println("GTA = " + map.get(kmerList.getPosition(1)).toString());
+ }
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
new file mode 100644
index 0000000..5bcf663
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.genomix.data.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class NodeWritableTest {
+
+ @Test
+ public void testNodeReset() throws IOException {
+
+ NodeWritable outputNode = new NodeWritable();
+ NodeWritable inputNode = new NodeWritable();
+
+ KmerListWritable nextKmerList = new KmerListWritable();
+ KmerListWritable preKmerList = new KmerListWritable();
+ KmerBytesWritable preKmer = new KmerBytesWritable();
+ KmerBytesWritable curKmer = new KmerBytesWritable();
+ KmerBytesWritable nextKmer = new KmerBytesWritable();
+ PositionWritable nodeId = new PositionWritable();
+ PositionListWritable nodeIdList = new PositionListWritable();
+ KmerBytesWritable.setGlobalKmerLength(5);
+
+ nodeId.set((byte)0, (long)1, 0);
+ nodeIdList.append(nodeId);
+ for (int i = 6; i <= 10; i++) {
+ NodeWritable tempNode = new NodeWritable();
+
+ String randomString = generaterRandomString(i);
+ byte[] array = randomString.getBytes();
+
+ curKmer.setByRead(array, 0);
+ preKmer.setAsCopy(curKmer);
+ nextKmer.setAsCopy(curKmer);
+ nextKmer.shiftKmerWithNextChar(array[5]);
+
+ nextKmerList.append(nextKmer);
+
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setFFList(nextKmerList);
+
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setFFList(nextKmerList);
+
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+
+ int j = 5;
+ for (; j < array.length - 1; j++) {
+ outputNode.reset();
+ curKmer.setAsCopy(nextKmer);
+
+ nextKmer.shiftKmerWithNextChar(array[j+1]);
+ nextKmerList.reset();
+ nextKmerList.append(nextKmer);
+ preKmerList.reset();
+ preKmerList.append(preKmer);
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setFFList(nextKmerList);
+ outputNode.setRRList(preKmerList);
+ tempNode.reset();
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setFFList(nextKmerList);
+ tempNode.setRRList(preKmerList);
+ preKmer.setAsCopy(curKmer);
+ inputNode.reset();
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+ }
+ curKmer.setAsCopy(nextKmer);
+ preKmerList.reset();
+ preKmerList.append(preKmer);
+ outputNode.reset();
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setRRList(preKmerList);
+ tempNode.reset();
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setRRList(preKmerList);
+ inputNode.reset();
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+ }
+ }
+
+ public String generaterRandomString(int n) {
+ char[] chars = "ACGT".toCharArray();
+ StringBuilder sb = new StringBuilder();
+ Random random = new Random();
+ for (int i = 0; i < n; i++) {
+ char c = chars[random.nextInt(chars.length)];
+ sb.append(c);
+ }
+ return sb.toString();
+ }
+}
diff --git a/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
new file mode 100644
index 0000000..f2e3942
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
@@ -0,0 +1,3 @@
+1 AATAG
+2 GCATA
+3 ATAGC
diff --git a/genomix/genomix-hadoop/data/webmap/MergeBubble.txt b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
new file mode 100644
index 0000000..087f43e
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/MergeBubble.txt
@@ -0,0 +1,2 @@
+1 AATAGAA
+2 AATACAA
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index f39cdcb..a0eb7c8 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -37,7 +37,7 @@
while (values.hasNext()) {
tmpNode.set(values.next());
outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
- outputNode.getFFList().appendList(tmpNode.getFFList());
+ outputNode.getFFList().appendList(tmpNode.getFFList()); //appendList need to check if insert node exists
outputNode.getFRList().appendList(tmpNode.getFRList());
outputNode.getRFList().appendList(tmpNode.getRFList());
outputNode.getRRList().appendList(tmpNode.getRRList());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 127ab3e..cc7e0ac 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,7 +22,7 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
+ private static final String DATA_PATH = "data/webmap/RemoveBridge.txt";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 52aeb7a..bd70f19 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -96,8 +96,7 @@
DataOutput fieldOutput = tupleBuilder.getDataOutput();
ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
try {
- fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
-
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 42d8db6..f29c51b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -18,14 +18,12 @@
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -47,24 +45,23 @@
public static final int OutputKmerField = 0;
public static final int OutputNodeField = 1;
-
private final int readLength;
private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null});
+ null });
public ReadsKeyValueParserFactory(int readlength, int k) {
this.readLength = readlength;
this.kmerSize = k;
}
-
- public static enum KmerDir {
+
+ public enum KmerDir {
FORWARD,
REVERSE,
}
-
+
@Override
public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
@@ -73,27 +70,22 @@
outputAppender.reset(outputBuffer, true);
KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new IKeyValueParser<LongWritable, Text>() {
-
+
private PositionWritable nodeId = new PositionWritable();
private PositionListWritable nodeIdList = new PositionListWritable();
- private VKmerListWritable edgeListForPreKmer = new VKmerListWritable();
- private VKmerListWritable edgeListForNextKmer = new VKmerListWritable();
- private NodeWritable outputNode = new NodeWritable();
-// private NodeWritable outputNode2 = new NodeWritable();
+ private NodeWritable curNode = new NodeWritable();
+ private NodeWritable nextNode = new NodeWritable();
- private KmerBytesWritable preForwardKmer = new KmerBytesWritable();
- private KmerBytesWritable preReverseKmer = new KmerBytesWritable();
private KmerBytesWritable curForwardKmer = new KmerBytesWritable();
private KmerBytesWritable curReverseKmer = new KmerBytesWritable();
private KmerBytesWritable nextForwardKmer = new KmerBytesWritable();
private KmerBytesWritable nextReverseKmer = new KmerBytesWritable();
-
- private KmerDir preKmerDir = KmerDir.FORWARD;
+
private KmerDir curKmerDir = KmerDir.FORWARD;
private KmerDir nextKmerDir = KmerDir.FORWARD;
byte mateId = (byte) 0;
-
+
@Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
@@ -125,145 +117,88 @@
if (kmerSize >= array.length) {
return;
}
- outputNode.reset();
+ curNode.reset();
+ nextNode.reset();
curForwardKmer.setByRead(array, 0);
curReverseKmer.setByReadReverse(array, 0);
curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
- setNextKmer(array[kmerSize]);
- setnodeId(mateId, readID, 0);
- setEdgeListForNextKmer();
- writeToFrame(writer);
+ nextForwardKmer.setAsCopy(curForwardKmer);
+ nextKmerDir = setNextKmer(nextForwardKmer, nextReverseKmer, array[kmerSize]);
+ setnodeId(curNode, mateId, readID, 0);
+ setnodeId(nextNode, mateId, readID, 0);
+ setEdgeListForCurAndNextKmer(curKmerDir, curNode, nextKmerDir, nextNode);
+ writeToFrame(curForwardKmer, curReverseKmer, curKmerDir, curNode, writer);
/*middle kmer*/
- int i = kmerSize;
- for (; i < array.length - 1; i++) {
- outputNode.reset();
- setPreKmerByOldCurKmer();
- setCurKmerByOldNextKmer();
- setNextKmer(array[i]);
- setnodeId(mateId, readID, 0);//i - kmerSize + 1
- setEdgeListForPreKmer();
- setEdgeListForNextKmer();
- writeToFrame(writer);
+ int i = kmerSize + 1;
+ for (; i < array.length; i++) {
+ curForwardKmer.setAsCopy(nextForwardKmer);
+ curReverseKmer.setAsCopy(nextReverseKmer);
+ curKmerDir = nextKmerDir;
+ curNode.set(nextNode);
+ nextNode.reset();
+ nextKmerDir = setNextKmer(nextForwardKmer, nextReverseKmer, array[i]);
+ setnodeId(nextNode, mateId, readID, 0);
+ setEdgeListForCurAndNextKmer(curKmerDir, curNode, nextKmerDir, nextNode);
+ writeToFrame(curForwardKmer, curReverseKmer, curKmerDir, curNode, writer);
}
-
+
/*last kmer*/
- outputNode.reset();
- setPreKmerByOldCurKmer();
- setCurKmerByOldNextKmer();
- setnodeId(mateId, readID, 0);//array.length - kmerSize + 1
- setEdgeListForPreKmer();
- writeToFrame(writer);
+ writeToFrame(nextForwardKmer, nextReverseKmer, nextKmerDir, nextNode, writer);
}
-
- public void setnodeId(byte mateId, long readID, int posId){
+
+ public void setnodeId(NodeWritable node, byte mateId, long readID, int posId) {
nodeId.set(mateId, readID, posId);
nodeIdList.reset();
nodeIdList.append(nodeId);
- outputNode.setNodeIdList(nodeIdList);
- }
-
- public void setNextKmer(byte nextChar){
- nextForwardKmer.setAsCopy(curForwardKmer);
- nextForwardKmer.shiftKmerWithNextChar(nextChar);
- nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
- nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
- }
-
- public void setPreKmerByOldCurKmer(){
- preKmerDir = curKmerDir;
- preForwardKmer.setAsCopy(curForwardKmer);
- preReverseKmer.setAsCopy(curReverseKmer);
+ node.setNodeIdList(nodeIdList);
}
- public void setCurKmerByOldNextKmer(){
- curKmerDir = nextKmerDir;
- curForwardKmer.setAsCopy(nextForwardKmer);
- curReverseKmer.setAsCopy(nextReverseKmer);
+ public KmerDir setNextKmer(KmerBytesWritable forwardKmer, KmerBytesWritable ReverseKmer,
+ byte nextChar) {
+ forwardKmer.shiftKmerWithNextChar(nextChar);
+ ReverseKmer.setByReadReverse(forwardKmer.toString().getBytes(), forwardKmer.getOffset());
+ return forwardKmer.compareTo(ReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
}
-
- public void writeToFrame(IFrameWriter writer) {
- switch(curKmerDir){
+
+ public void writeToFrame(KmerBytesWritable forwardKmer, KmerBytesWritable reverseKmer, KmerDir curKmerDir,
+ NodeWritable node, IFrameWriter writer) {
+ switch (curKmerDir) {
case FORWARD:
- InsertToFrame(curForwardKmer, outputNode, writer);
+ InsertToFrame(forwardKmer, node, writer);
break;
case REVERSE:
- InsertToFrame(curReverseKmer, outputNode, writer);
+ InsertToFrame(reverseKmer, node, writer);
break;
}
}
- public void setEdgeListForPreKmer(){
- switch(curKmerDir){
- case FORWARD:
- switch(preKmerDir){
- case FORWARD:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preForwardKmer);
- outputNode.setRRList(edgeListForPreKmer);
- break;
- case REVERSE:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preReverseKmer);
- outputNode.setRFList(edgeListForPreKmer);
- break;
- }
- break;
- case REVERSE:
- switch(preKmerDir){
- case FORWARD:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preForwardKmer);
- outputNode.setFRList(edgeListForPreKmer);
- break;
- case REVERSE:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preReverseKmer);
- outputNode.setFFList(edgeListForPreKmer);
- break;
- }
- break;
+
+ public void setEdgeListForCurAndNextKmer(KmerDir curKmerDir, NodeWritable curNode, KmerDir nextKmerDir,
+ NodeWritable nextNode) {
+ if (curKmerDir == KmerDir.FORWARD && nextKmerDir == KmerDir.FORWARD) {
+ curNode.getFFList().append(kmerSize, nextForwardKmer);
+ nextNode.getRRList().append(kmerSize, curForwardKmer);
+ }
+ if (curKmerDir == KmerDir.FORWARD && nextKmerDir == KmerDir.REVERSE) {
+ curNode.getFRList().append(kmerSize, nextReverseKmer);
+ nextNode.getFRList().append(kmerSize, curForwardKmer);
+ }
+ if (curKmerDir == KmerDir.REVERSE && nextKmerDir == KmerDir.FORWARD) {
+ curNode.getRFList().append(kmerSize, nextForwardKmer);
+ nextNode.getRFList().append(kmerSize, curReverseKmer);
+ }
+ if (curKmerDir == KmerDir.REVERSE && nextKmerDir == KmerDir.REVERSE) {
+ curNode.getRRList().append(kmerSize, nextReverseKmer);
+ nextNode.getFFList().append(kmerSize, curReverseKmer);
}
}
-
- public void setEdgeListForNextKmer(){
- switch(curKmerDir){
- case FORWARD:
- switch(nextKmerDir){
- case FORWARD:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextForwardKmer);
- outputNode.setFFList(edgeListForNextKmer);
- break;
- case REVERSE:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextReverseKmer);
- outputNode.setFRList(edgeListForNextKmer);
- break;
- }
- break;
- case REVERSE:
- switch(nextKmerDir){
- case FORWARD:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextForwardKmer);
- outputNode.setRFList(edgeListForNextKmer);
- break;
- case REVERSE:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextReverseKmer);
- outputNode.setRRList(edgeListForNextKmer);
- break;
- }
- break;
- }
- }
-
+
private void InsertToFrame(KmerBytesWritable kmer, NodeWritable node, IFrameWriter writer) {
try {
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
tupleBuilder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
-
+
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
FrameUtils.flushFrame(outputBuffer, writer);
@@ -289,5 +224,4 @@
}
};
}
-
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 46fdd0e..03b220e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -75,13 +75,14 @@
AggregateState state) throws HyracksDataException {
NodeWritable localUniNode = (NodeWritable) state.state;
localUniNode.reset();
+// readKeyKmer.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 0));
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
localUniNode.getFFList().appendList(readNode.getFFList());
localUniNode.getFRList().appendList(readNode.getFRList());
localUniNode.getRFList().appendList(readNode.getRFList());
localUniNode.getRRList().appendList(readNode.getRRList());
-
+
// make an empty field
tupleBuilder.addFieldEndOffset();// mark question?
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index 1ee6cae..47ae084 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -50,7 +51,7 @@
return new IAggregatorDescriptor() {
private NodeWritable readNode = new NodeWritable();
-
+
protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
new file mode 100644
index 0000000..f245c7a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class AssembleKeyIntoNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+ public AssembleKeyIntoNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = outRecDesc;
+ this.kmerSize = kmerSize;
+ KmerBytesWritable.setGlobalKmerLength(this.kmerSize);
+ }
+
+ private static final long serialVersionUID = 1L;
+ private final int kmerSize;
+
+ public static final int InputKmerField = 0;
+ public static final int InputtempNodeField = 1;
+ public static final int OutputNodeField = 0;
+
+ public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[1]);
+
+ public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ public static final int INT_LENGTH = 4;
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
+
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
+
+ NodeWritable readNode;
+ KmerBytesWritable readKmer;
+
+ public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+
+ readNode = new NodeWritable();
+ readKmer = new KmerBytesWritable();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ generateNodeFromKmer(i);
+ }
+ }
+
+ private void generateNodeFromKmer(int tIndex) throws HyracksDataException {
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ setKmer(readKmer, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputKmerField));
+ readNode.reset();
+ setNode(readNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputtempNodeField));
+ readNode.getKmer().setAsCopy(readKmer);
+ outputNode(readNode);
+ }
+
+
+ private void setKmer(KmerBytesWritable kmer, int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ kmer.setAsCopy(buffer.array(), offset);
+ }
+
+ private void setNode(NodeWritable node, int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ node.setAsCopy(buffer.array(), offset);
+ }
+
+
+ private void outputNode(NodeWritable node) throws HyracksDataException {
+
+ try {
+ builder.reset();
+ builder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
+
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ throw new IllegalStateException("Failed to append tuplebuilder to frame");
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+ recordDescriptors[0]);
+ }
+
+}
+
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
index b7b7054..fa6ae9b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -17,7 +17,7 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.AssembleKeyIntoNodeOperator;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -33,8 +33,7 @@
*/
private static final long serialVersionUID = 1L;
private final int kmerSize;
- public static final int OutputKmerField = ReadsKeyValueParserFactory.OutputKmerField;
- public static final int outputNodeField = ReadsKeyValueParserFactory.OutputNodeField;
+ public static final int OutputNodeField = AssembleKeyIntoNodeOperator.OutputNodeField;
public NodeTextWriterFactory(int k) {
this.kmerSize = k;
@@ -53,9 +52,7 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- node.setAsReference(tuple.getFieldData(outputNodeField), tuple.getFieldStart(outputNodeField));
- node.getKmer().reset(kmerSize);
- node.getKmer().setAsReference(tuple.getFieldData(OutputKmerField), tuple.getFieldStart(OutputKmerField));
+ node.setAsReference(tuple.getFieldData(OutputNodeField), tuple.getFieldStart(OutputNodeField));
try {
output.write(node.toString().getBytes());
output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index afc1cf7..6a5dcc4 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -29,6 +29,7 @@
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.AssembleKeyIntoNodeOperator;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.io.NodeTextWriterFactory;
@@ -181,6 +182,16 @@
return kmerCrossAggregator;
}
+ public AbstractOperatorDescriptor generateKmerToFinalNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) {
+
+ AbstractOperatorDescriptor mapToFinalNode = new AssembleKeyIntoNodeOperator(jobSpec,
+ AssembleKeyIntoNodeOperator.nodeOutputRec, kmerSize);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapToFinalNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapToFinalNode;
+ }
+
public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
ITupleWriterFactory nodeWriter = null;
@@ -209,10 +220,15 @@
logDebug("Group by Kmer");
AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+ logDebug("Generate final node");
+ lastOperator = generateKmerToFinalNode(jobSpec, lastOperator);
+
+ jobSpec.addRoot(lastOperator);
+
logDebug("Write node to result");
lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
- jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
+ jobSpec.addRoot(lastOperator);
return jobSpec;
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index 25915aa..1ade3a9 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -31,12 +31,12 @@
@SuppressWarnings("deprecation")
public class JobRun {
- private static final int KmerSize = 5;
- private static final int ReadLength = 6;
+ private static final int KmerSize = 3;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test1.txt";
+ private static final String DATA_INPUT_PATH = "src/test/resources/data/lastesttest/HighSplitRepeat.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -52,8 +52,8 @@
@Test
public void TestAll() throws Exception {
- TestReader();
-// TestGroupby();
+// TestReader();
+ TestGroupby();
}
public void TestReader() throws Exception {
@@ -68,7 +68,7 @@
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-// dumpResult();
+ dumpResult();
}
@Before
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 51a0d15..fbbc89a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 5;
- private static final int ReadLength = 8;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test1.txt";
+ private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -75,11 +75,11 @@
@Test
public void TestAll() throws Exception {
-// TestReader();
+ TestReader();
// TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
- TestEndToEnd();
+// TestEndToEnd();
// TestUnMergedNode();
}
diff --git a/genomix/genomix-hyracks/src/test/resources/data/lastesttest/HighSplitRepeat.txt b/genomix/genomix-hyracks/src/test/resources/data/lastesttest/HighSplitRepeat.txt
new file mode 100644
index 0000000..eca0a13
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/data/lastesttest/HighSplitRepeat.txt
@@ -0,0 +1,3 @@
+1 AGCCACA
+2 GCACTTT
+3 CGCCGTC
diff --git a/genomix/genomix-hyracks/src/test/resources/data/lastesttest/LowSplitRepeat.txt b/genomix/genomix-hyracks/src/test/resources/data/lastesttest/LowSplitRepeat.txt
new file mode 100644
index 0000000..259fd80
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/data/lastesttest/LowSplitRepeat.txt
@@ -0,0 +1,3 @@
+1 AGCCA
+2 AGCCG
+3 GCCTT
diff --git a/genomix/genomix-hyracks/src/test/resources/data/lastesttest/MidSplitRepeat.txt b/genomix/genomix-hyracks/src/test/resources/data/lastesttest/MidSplitRepeat.txt
new file mode 100644
index 0000000..e934e54
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/data/lastesttest/MidSplitRepeat.txt
@@ -0,0 +1,3 @@
+1 AGCCA
+2 CGCCT
+3 GCCGG
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
new file mode 100644
index 0000000..bb03d70
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CATAC
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
index 3f1cd5c..a720dc4 100644
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
@@ -1 +1 @@
-1 AATAGA
+1 AATAGAA
diff --git a/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000 b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
new file mode 100755
index 0000000..bc255a4
--- /dev/null
+++ b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
new file mode 100755
index 0000000..a983577
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
new file mode 100755
index 0000000..a187c64
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
index e8a72ce..859ddba 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/GraphCleanInputFormat.java
@@ -66,6 +66,7 @@
* set the vertex value
*/
vertexValue.set(getRecordReader().getCurrentValue());
+ vertexValue.setKmerlength(getRecordReader().getCurrentValue().getKmerlength());
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index 09e98fa..d5fe843 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -6,8 +6,8 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
+
+import edu.uci.ics.genomix.type.KmerListWritable;
public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
private VKmerListWritable forwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index 9fd15dd..95fa865 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -41,8 +41,9 @@
this.sourceVertexId.set(msg.getSourceVertexId());
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.setAsCopy(msg.getChainVertexId());
+
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.chainVertexId.set(msg.getChainVertexId());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -62,8 +63,8 @@
this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.setAsCopy(chainVertexId);
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.chainVertexId.set(chainVertexId);
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -100,8 +101,8 @@
public void setChainVertexId(KmerBytesWritable chainVertexId) {
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.setAsCopy(chainVertexId);
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.chainVertexId.set(chainVertexId);
}
}
@@ -144,7 +145,7 @@
out.writeByte(checkMessage);
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
chainVertexId.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
@@ -159,7 +160,7 @@
checkMessage = in.readByte();
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
chainVertexId.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index e3cd345..92f8464 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -26,13 +26,15 @@
private boolean isFlip;
private int kmerlength = 0;
private boolean updateMsg = false;
-
+ private KmerBytesWritable startVertexId;
+
private byte checkMessage;
public MessageWritable() {
sourceVertexId = new KmerBytesWritable();
kmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable();
+ startVertexId = new KmerBytesWritable();
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -40,9 +42,11 @@
public MessageWritable(int kmerSize) {
kmerlength = kmerSize;
- sourceVertexId = new KmerBytesWritable();
- kmer = new VKmerBytesWritable();
+
+ sourceVertexId = new KmerBytesWritable(kmerSize);
+ kmer = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable(kmerSize);
+ startVertexId = new KmerBytesWritable(kmerSize);
flag = Message.NON;
isFlip = false;
checkMessage = (byte) 0;
@@ -56,13 +60,17 @@
this.sourceVertexId.setAsCopy(msg.getSourceVertexId());
}
if (kmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(msg.getActualKmer());
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(msg.getActualKmer());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(msg.getNeighberNode());
}
+ if (startVertexId != null) {
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(msg.getStartVertexId());
+ }
checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
updateMsg = msg.isUpdateMsg();
@@ -76,8 +84,8 @@
this.sourceVertexId.setAsCopy(sourceVertexId);
}
if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(chainVertexId.toString())); // TODO Vkmer
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(chainVertexId);
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -95,6 +103,7 @@
kmerlength = kmerSize;
// kmer.reset();
neighberNode.reset(kmerSize);
+ startVertexId.reset(kmerSize);
flag = Message.NON;
isFlip = false;
}
@@ -116,8 +125,8 @@
public void setActualKmer(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(actualKmer);
}
}
@@ -127,8 +136,8 @@
public void setCreatedVertexId(KmerBytesWritable actualKmer) {
if (actualKmer != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.kmer.setAsCopy(new VKmerBytesWritable(actualKmer.toString()));
+ checkMessage |= CheckMessage.ACUTUALKMER;
+ this.kmer.set(actualKmer);
}
}
@@ -143,6 +152,17 @@
}
}
+ public KmerBytesWritable getStartVertexId() {
+ return startVertexId;
+ }
+
+ public void setStartVertexId(KmerBytesWritable startVertexId) {
+ if(startVertexId != null){
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(startVertexId);
+ }
+ }
+
public int getLengthOfChain() {
return kmer.getKmerLetterLength();
}
@@ -189,12 +209,14 @@
out.writeByte(checkMessage);
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
nodeIdList.write(out);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
out.writeBoolean(updateMsg);
@@ -207,12 +229,14 @@
checkMessage = in.readByte();
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
+ if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
nodeIdList.readFields(in);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
updateMsg = in.readBoolean();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 83f286c..27fa846 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -12,18 +12,17 @@
public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
- public static class State extends VertexStateFlag{
- public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b101 << 0;
- public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b111 << 0;
-
+ public static class State extends VertexStateFlag{
public static final byte NO_MERGE = 0b00 << 3;
public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
- public static final byte KILL = 0b11 << 3;
- public static final byte KILL_MASK = 0b11 << 3;
+ public static final byte KILL = 0b1 << 3;
+ public static final byte KILL_MASK = 0b1 << 3;
+
+ public static final byte DIR_FROM_DEADVERTEX = 0b10 << 3;
}
public static class VertexStateFlag extends FakeFlag {
@@ -313,7 +312,7 @@
*/
public void processMerges(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
byte neighborToMergeDir, KmerBytesWritable nodeToAdd,
- int kmerSize, KmerBytesWritable kmer){
+ int kmerSize, VKmerBytesWritable kmer){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete); //set(null);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 89b66e6..3c7ae8a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -3,12 +3,11 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-//import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -48,7 +47,7 @@
* Naive Algorithm for path merge graph
*/
public class BridgeAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
@@ -60,7 +59,7 @@
public void initVertex() {
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if(length == -1)
+ if (length == -1)
length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
}
@@ -69,40 +68,42 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(3, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
//add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(3, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C'};
- VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(2, (byte)2));
- vertexValue.setFFList(plist2);
+ KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("ACG".getBytes(), 0);
+ KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
+ }
+ else if(getVertexId().toString().equals("ACG")){
+ KmerBytesWritable brdgeVertexId = new KmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getRFList().append(brdgeVertexId);
}
- if(getVertexId().getReadID() == 2 && getVertexId().getPosInRead() == 2)
- getVertexValue().getRRList().append(3, (byte)1);
}
voteToHalt();
}
@@ -116,7 +117,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 7f85fc7..caf7a36 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -11,8 +11,8 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -59,76 +59,17 @@
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if(length == -1)
- length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
- outgoingMsg.reset();
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
receivedMsgList.clear();
}
-
- /**
- * broadcast kill self to all neighbers
- */
- public void broadcaseKillself(){
- outgoingMsg.setSourceVertexId(getVertexId());
- if(receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } // RR
- else if(receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else
- voteToHalt();
- }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
@@ -150,13 +91,17 @@
i++;
}
if(receivedMsgList.size() == 2){
- if(getVertexValue().getLengthOfKmer() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length
+ && getVertexValue().getDegree() == 2){
broadcaseKillself();
}
}
}
else if(getSuperstep() == 3){
- responseToDeadVertex(msgIterator);
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index c0ba1a9..f5aa3a1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -4,12 +4,10 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -48,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class BubbleAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BubbleAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -65,40 +63,42 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(2, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
- //add tip vertex
+ //add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(2, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
- VKmerBytesWritable kmer = new VKmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(1, (byte)4));
- vertexValue.setFFList(plist2);
+ KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("AGA".getBytes(), 0);
+ KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
+ }
+ else if(getVertexId().toString().equals("AGA")){
+ KmerBytesWritable brdgeVertexId = new KmerBytesWritable(kmerSize);
+ brdgeVertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getRFList().append(brdgeVertexId);
}
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4)
- getVertexValue().getRRList().append(2, (byte)1);
}
voteToHalt();
}
@@ -112,7 +112,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index b782294..d630b5f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -7,6 +7,7 @@
import org.apache.hadoop.io.NullWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -51,7 +52,7 @@
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
public static final String ITERATIONS = "BubbleMergeVertex.iteration";
public static int kmerSize = -1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 64965e3..4ae7231 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -5,12 +5,10 @@
import org.apache.hadoop.io.NullWritable;
-
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.GeneCode;
@@ -29,9 +27,10 @@
protected MessageWritable incomingMsg = null;
protected MessageWritable outgoingMsg = null;
- protected VKmerBytesWritable destVertexId = new VKmerBytesWritable();
- protected Iterator<VKmerBytesWritable> posIterator;
- protected VKmerBytesWritable tmpKmer = new VKmerBytesWritable();
+
+ protected KmerBytesWritable destVertexId = null;
+ protected Iterator<KmerBytesWritable> kmerIterator;
+ protected KmerBytesWritable tmpKmer = new KmerBytesWritable(kmerSize);
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -70,11 +69,11 @@
*/
public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
- return posIterator.next();
+ kmerIterator = value.getFFList().iterator();
+ return kmerIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
+ kmerIterator = value.getFRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
@@ -82,11 +81,11 @@
public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
- return posIterator.next();
+ kmerIterator = value.getRFList().iterator();
+ return kmerIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
+ kmerIterator = value.getRRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
@@ -95,67 +94,67 @@
/**
* get destination vertex
*/
- public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
+ kmerIterator = value.getFFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
+ kmerIterator = value.getFRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
}
- public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
+ kmerIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
+ kmerIterator = value.getRRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
}
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
+
/**
* head send message to all previous nodes
*/
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -169,21 +168,49 @@
}
/**
+ * tip send message with sourceId and dir to previous node
+ * tip only has one incoming
+ */
+ public void sendSettledMsgToPreviousNode(){
+ if(getVertexValue().getFFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
+ * tip send message with sourceId and dir to next node
+ * tip only has one outgoing
+ */
+ public void sendSettledMsgToNextNode(){
+ if(getVertexValue().getRFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getPreDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
* head send message to all previous nodes
*/
public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRF);
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -192,18 +219,18 @@
* head send message to all next nodes
*/
public void sendSettledMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFF);
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFR);
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -748,44 +775,44 @@
switch(incomingMsg.getFlag() & MessageFlag.DIR_MASK){
case MessageFlag.DIR_FF:
//remove incomingMsg.getSourceId from RR positionList
- posIterator = getVertexValue().getRRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_FR:
//remove incomingMsg.getSourceId from FR positionList
- posIterator = getVertexValue().getFRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RF:
//remove incomingMsg.getSourceId from RF positionList
- posIterator = getVertexValue().getRFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RR:
//remove incomingMsg.getSourceId from FF positionList
- posIterator = getVertexValue().getFFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index 99cd3c2..51100e5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -51,10 +51,12 @@
kmerList.reset();
if(fakeVertex == null){
// fakeVertex = new KmerBytesWritable(kmerSize + 1); // TODO check if merge is correct
- fakeVertex = new KmerBytesWritable();
+ fakeVertex = new VKmerBytesWritable();
String random = generaterRandomString(kmerSize + 1);
fakeVertex.setByRead(random.getBytes(), 0);
}
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
/**
@@ -187,7 +189,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index c9942a2..d3f21f3 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -80,6 +80,8 @@
fakeVertex.setByRead(random.getBytes(), 0);
}
isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
/**
@@ -266,7 +268,7 @@
mapKeyByActualKmer(msgIterator);
/** Reducer **/
reduceKeyByActualKmer();
- voteToHalt();
+ voteToHalt();
}
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
if(!isFakeVertex){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index ecfafa7..02151fa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -80,6 +80,8 @@
outgoingMsg = new MessageWritable(kmerSize);
else
outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
deleted file mode 100644
index 3f91ac1..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ /dev/null
@@ -1,501 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator.pathmerge;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.hadoop.io.NullWritable;
-
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-/**
- * Naive Algorithm for path merge graph
- */
-public class P5ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "P5ForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "P5ForPathMergeVertex.iteration";
- public static final String RANDSEED = "P5ForPathMergeVertex.randSeed";
- public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private static long randSeed = -1;
- private float probBeingRandomHead = -1;
- private Random randGenerator;
-
- private PositionWritable curID = new PositionWritable();
- private PositionWritable nextID = new PositionWritable();
- private PositionWritable prevID = new PositionWritable();
- private boolean hasNext;
- private boolean hasPrev;
- private boolean curHead;
- private boolean nextHead;
- private boolean prevHead;
- private byte headFlag;
- private byte tailFlag;
- private byte outFlag;
-
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
-
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex() {
- if (kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- if (randSeed < 0)
- randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
- randGenerator = new Random(randSeed);
- if (probBeingRandomHead < 0)
- probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
- hasNext = false;
- hasPrev = false;
- curHead = false;
- nextHead = false;
- prevHead = false;
- outgoingMsg.reset();
- }
-
- protected boolean isNodeRandomHead(PositionWritable nodeID) {
- // "deterministically random", based on node id
- randGenerator.setSeed(randSeed ^ nodeID.hashCode());
- return randGenerator.nextFloat() < probBeingRandomHead;
- }
-
- /**
- * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
- */
- protected boolean setNextInfo(VertexValueWritable value) {
- if (value.getFFList().getCountOfPosition() > 0) {
- nextID.set(value.getFFList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
- return true;
- }
- if (value.getFRList().getCountOfPosition() > 0) {
- nextID.setAsCopy(value.getFRList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
- return true;
- }
- return false;
- }
-
- /**
- * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
- */
- protected boolean setPrevInfo(VertexValueWritable value) {
- if (value.getRRList().getCountOfPosition() > 0) {
- prevID.setAsCopy(value.getRRList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
- return true;
- }
- if (value.getRFList().getCountOfPosition() > 0) {
- prevID.setAsCopy(value.getRFList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
- return true;
- }
- return false;
- }
-
- /**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getFlag() == Message.START) {
- getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
- } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.IS_HEAD) {
- getVertexValue().setState(MessageFlag.IS_HEAD);
- getVertexValue().setKmer(getVertexValue().getKmer());
- //voteToHalt();
- } //else
- //voteToHalt();
- }
-
- /**
- * check if A need to be flipped with successor
- */
- public boolean ifFilpWithSuccessor(){
- if(getVertexValue().getFRList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * check if A need to be filpped with predecessor
- */
- public boolean ifFlipWithPredecessor(){
- if(getVertexValue().getRFList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * set adjMessage to successor(from predecessor)
- */
- public void setSuccessorAdjMsg(){
- if(getVertexValue().getFFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_FF;
- else
- outFlag |= MessageFlag.DIR_FR;
- }
-
- /**
- * set adjMessage to predecessor(from successor)
- */
- public void setPredecessorAdjMsg(){
- if(getVertexValue().getRFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_RF;
- else
- outFlag |= MessageFlag.DIR_RF;
- }
-
- /**
- * send update message to neighber
- * @throws IOException
- */
- public void broadcastUpdateMsg(){
- /* switch(getVertexValue().getState() & 0b0001){
- case MessageFlag.SHOULD_MERGEWITHPREV:
- setSuccessorAdjMsg();
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- break;
- case MessageFlag.SHOULD_MERGEWITHNEXT:
- setPredecessorAdjMsg();
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
- break;
- }*/
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromPredecessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHNEXT;
- getVertexValue().setState(state);
- if(getVertexValue().getFFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromSuccessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHPREV;
- getVertexValue().setState(state);
- if(getVertexValue().getRFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * Returns the edge dir for B->A when the A->B edge is type @dir
- */
- public byte mirrorDirection(byte dir) {
- switch (dir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RF;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_FF;
- default:
- throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
- }
- }
-
- /**
- * check if need filp
- */
- public byte flipDirection(byte neighborDir, boolean flip){
- if(flip){
- switch (neighborDir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FF;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_RF;
- default:
- throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
- }
- } else
- return neighborDir;
- }
-
- /**
- * updateAdjList
- */
- public void processUpdate(){
- /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));*/
- }
-
- /**
- * merge and updateAdjList
- */
- public void processMerge(){
- /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getKmer());*/
- }
-
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1)
- startSendMsg();
- else if (getSuperstep() == 2)
- initState(msgIterator);
- else if (getSuperstep() % 4 == 3){
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
- tailFlag = (byte) (State.IS_HEAD & getVertexValue().getState()); //is_tail
- outFlag = (byte) (headFlag | tailFlag);
-
- // only PATH vertices are present. Find the ID's for my neighbors
- curID.set(getVertexId());
-
- curHead = isNodeRandomHead(curID);
-
- // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
- // We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
- hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
- getVertexValue().setState(outFlag);
- voteToHalt();
- }
- if (hasNext || hasPrev) {
- if (curHead) {
- if (hasNext && !nextHead) {
- // compress this head to the forward tail
- sendUpMsgFromPredecessor();
- } else if (hasPrev && !prevHead) {
- // compress this head to the reverse tail
- sendUpMsgFromSuccessor();
- }
- } else {
- // I'm a tail
- if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
- // tails on both sides, and I'm the "local minimum"
- // compress me towards the tail in forward dir
- sendUpMsgFromPredecessor();
- }
- } else if (!hasPrev) {
- // no previous node
- if (!nextHead && curID.compareTo(nextID) < 0) {
- // merge towards tail in forward dir
- sendUpMsgFromPredecessor();
- }
- } else if (!hasNext) {
- // no next node
- if (!prevHead && curID.compareTo(prevID) < 0) {
- // merge towards tail in reverse dir
- sendUpMsgFromSuccessor();
- }
- }
- }
- }
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(P5ForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(P5ForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 33b62c3..fa50e66 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -7,6 +7,9 @@
import java.util.Random;
import java.util.Set;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
@@ -16,88 +19,57 @@
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
public class SplitRepeatVertex extends
BasicGraphCleanVertex{
- public class CreatedVertex{
- VKmerBytesWritable createdVertexId;
- String incomingDir;
- String outgoingDir;
- VKmerBytesWritable incomingEdge;
- VKmerBytesWritable outgoingEdge;
+
+ public class EdgeDir{
+ public static final byte DIR_FF = 0 << 0;
+ public static final byte DIR_FR = 1 << 0;
+ public static final byte DIR_RF = 2 << 0;
+ public static final byte DIR_RR = 3 << 0;
+ }
+
+ public class DeletedEdge{
+ private byte dir;
+ private KmerBytesWritable edge;
- public CreatedVertex(){
- createdVertexId = new VKmerBytesWritable(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge = new VKmerBytesWritable(kmerSize);
- outgoingEdge = new VKmerBytesWritable(kmerSize);
- }
-
- public void clear(){
- createdVertexId.reset(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge.reset(kmerSize);
- outgoingEdge.reset(kmerSize);
- }
-
- public VKmerBytesWritable getCreatedVertexId() {
- return createdVertexId;
+ public DeletedEdge(){
+ dir = 0;
+ edge = new KmerBytesWritable(kmerSize);
}
- public void setCreatedVertexId(KmerBytesWritable createdVertexId) {
- this.createdVertexId = createdVertexId;
+ public byte getDir() {
+ return dir;
}
- public String getIncomingDir() {
- return incomingDir;
+ public void setDir(byte dir) {
+ this.dir = dir;
}
- public void setIncomingDir(String incomingDir) {
- this.incomingDir = incomingDir;
+ public KmerBytesWritable getEdge() {
+ return edge;
}
- public String getOutgoingDir() {
- return outgoingDir;
- }
-
- public void setOutgoingDir(String outgoingDir) {
- this.outgoingDir = outgoingDir;
- }
-
- public VKmerBytesWritable getIncomingEdge() {
- return incomingEdge;
- }
-
- public void setIncomingEdge(KmerBytesWritable incomingEdge) {
- this.incomingEdge.set(incomingEdge);
- }
-
- public VKmerBytesWritable getOutgoingEdge() {
- return outgoingEdge;
- }
-
- public void setOutgoingEdge(KmerBytesWritable outgoingEdge) {
- this.outgoingEdge.set(outgoingEdge);
+ public void setEdge(KmerBytesWritable edge) {
+ this.edge.set(edge);
}
}
- private String[][] connectedTable = new String[][]{
- {"FF", "RF"},
- {"FF", "RR"},
- {"FR", "RF"},
- {"FR", "RR"}
+ private byte[][] connectedTable = new byte[][]{
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FR},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FR}
};
public static Set<String> existKmerString = new HashSet<String>();
private Set<Long> readIdSet;
private Set<Long> incomingReadIdSet = new HashSet<Long>();
private Set<Long> outgoingReadIdSet = new HashSet<Long>();
private Set<Long> selfReadIdSet = new HashSet<Long>();
- private Set<Long> incomingEdgeIntersection = new HashSet<Long>();
- private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
private VKmerListWritable incomingEdgeList = null;
@@ -106,8 +78,6 @@
private byte outgoingEdgeDir = 0;
protected KmerBytesWritable createdVertexId = null;
- private CreatedVertex createdVertex = new CreatedVertex();
- public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
/**
* initiate kmerSize, maxIteration
@@ -128,7 +98,9 @@
if(outgoingEdgeList == null)
outgoingEdgeList = new VKmerListWritable(kmerSize);
if(createdVertexId == null)
- createdVertexId = new VKmerBytesWritable(kmerSize + 1);
+ createdVertexId = new KmerBytesWritable(kmerSize);//kmerSize + 1
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
/**
@@ -150,7 +122,184 @@
return sb.toString();
}
+ /**
+ * GenerateString only for test
+ */
+ public String generateString(){
+ if(existKmerString.isEmpty()){
+ existKmerString.add("AAA");
+ return "AAA";
+ }
+ else
+ return "GGG";
+ }
+
+ public void generateKmerMap(Iterator<MessageWritable> msgIterator){
+ kmerMap.clear();
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ readIdSet = new HashSet<Long>();
+ for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
+ readIdSet.add(nodeId.getReadId());
+ }
+ kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
+ }
+ }
+
+ public void setSelfReadIdSet(){
+ selfReadIdSet.clear();
+ for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
+ selfReadIdSet.add(nodeId.getReadId());
+ }
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
+ public void createNewVertex(int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
+ //add the corresponding edge to new vertex
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ vertexValue.getRFList().append(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ vertexValue.getRRList().append(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ vertexValue.getFFList().append(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ vertexValue.getFRList().append(outgoingEdge);
+ break;
+ }
+ vertexId.set(createdVertexId);
+ vertex.setVertexId(vertexId);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(vertexId, vertex);
+ }
+
+ public void sendMsgToUpdateEdge(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ outgoingMsg.setCreatedVertexId(createdVertexId);
+ outgoingMsg.setSourceVertexId(getVertexId());
+
+ outgoingMsg.setFlag(incomingEdgeDir);
+ destVertexId.set(incomingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+
+ outgoingMsg.setFlag(outgoingEdgeDir);
+ destVertexId.set(outgoingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ public void storeDeletedEdge(Set<DeletedEdge> deletedEdges, int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ DeletedEdge deletedIncomingEdge = new DeletedEdge();
+ DeletedEdge deletedOutgoingEdge = new DeletedEdge();
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RF);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RR);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FF);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FR);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ }
+ deletedEdges.add(deletedIncomingEdge);
+ deletedEdges.add(deletedOutgoingEdge);
+ }
+ public void deleteEdgeFromOldVertex(DeletedEdge deleteEdge){
+ switch(deleteEdge.dir){
+ case EdgeDir.DIR_RF:
+ getVertexValue().getRFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_RR:
+ getVertexValue().getRRList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FF:
+ getVertexValue().getFFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FR:
+ getVertexValue().getFRList().remove(deleteEdge.getEdge());
+ break;
+ }
+ }
+
+ public void setEdgeListAndEdgeDir(int i){
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ incomingEdgeList.set(getVertexValue().getRFList());
+ incomingEdgeDir = MessageFlag.DIR_RF;
+ break;
+ case EdgeDir.DIR_RR:
+ incomingEdgeList.set(getVertexValue().getRRList());
+ incomingEdgeDir = MessageFlag.DIR_RR;
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ outgoingEdgeList.set(getVertexValue().getFFList());
+ outgoingEdgeDir = MessageFlag.DIR_FF;
+ break;
+ case EdgeDir.DIR_FR:
+ outgoingEdgeList.set(getVertexValue().getFRList());
+ outgoingEdgeDir = MessageFlag.DIR_FR;
+ break;
+ }
+ }
+
+ public void setNeighborEdgeIntersection(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ outgoingReadIdSet.clear();
+ incomingReadIdSet.clear();
+ tmpKmer.set(incomingEdge);
+ incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
+ tmpKmer.set(outgoingEdge);
+ outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
+
+ //set all neighberEdge readId intersection
+ neighborEdgeIntersection.addAll(selfReadIdSet);
+ neighborEdgeIntersection.retainAll(incomingReadIdSet);
+ neighborEdgeIntersection.retainAll(outgoingReadIdSet);
+ }
+
+ public void updateEdgeListPointToNewVertex(){
+ byte meToNeighborDir = incomingMsg.getFlag();
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_FR:
+ getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RF:
+ getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RR:
+ getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -169,161 +318,103 @@
}
voteToHalt();
} else if(getSuperstep() == 3){
- kmerMap.clear();
- createdVertexSet.clear();
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- readIdSet = new HashSet<Long>();
- for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
- readIdSet.add(nodeId.getReadId());
- }
- kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
- }
+ /** generate KmerMap map kmer(key) to readIdSet(value) **/
+ generateKmerMap(msgIterator);
+
+ /** set self readId set **/
+ setSelfReadIdSet();
+
+ int count = 0;
+ //A set storing deleted edges
+ Set<DeletedEdge> deletedEdges = new HashSet<DeletedEdge>();
/** process connectedTable **/
for(int i = 0; i < 4; i++){
- switch(connectedTable[i][0]){
- case "FF":
- outgoingEdgeList.set(getVertexValue().getFFList());
- outgoingEdgeDir = MessageFlag.DIR_FF;
- break;
- case "FR":
- outgoingEdgeList.set(getVertexValue().getFRList());
- outgoingEdgeDir = MessageFlag.DIR_FR;
- break;
- }
- switch(connectedTable[i][1]){
- case "RF":
- incomingEdgeList.set(getVertexValue().getRFList());
- incomingEdgeDir = MessageFlag.DIR_RF;
- break;
- case "RR":
- incomingEdgeList.set(getVertexValue().getRRList());
- incomingEdgeDir = MessageFlag.DIR_RR;
- break;
- }
- selfReadIdSet.clear();
- for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
- selfReadIdSet.add(nodeId.getReadId());
- }
- for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
- for(KmerBytesWritable incomingEdge : incomingEdgeList){
- outgoingReadIdSet.clear();
- incomingReadIdSet.clear();
- outgoingReadIdSet.addAll(kmerMap.get(outgoingEdge));
- incomingReadIdSet.addAll(kmerMap.get(incomingEdge));
-
- //set all neighberEdge readId intersection
- neighborEdgeIntersection.addAll(selfReadIdSet);
- neighborEdgeIntersection.retainAll(outgoingReadIdSet);
- neighborEdgeIntersection.retainAll(incomingReadIdSet);
- //set outgoingEdge readId intersection
- outgoingEdgeIntersection.addAll(selfReadIdSet);
- outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
- outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
- //set incomingEdge readId intersection
- incomingEdgeIntersection.addAll(selfReadIdSet);
- incomingEdgeIntersection.retainAll(incomingReadIdSet);
- incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
+ /** set edgeList and edgeDir based on connectedTable **/
+ setEdgeListAndEdgeDir(i);
+
+ KmerBytesWritable incomingEdge = new KmerBytesWritable(kmerSize);
+ KmerBytesWritable outgoingEdge = new KmerBytesWritable(kmerSize);
+ for(int x = 0; x < incomingEdgeList.getCountOfPosition(); x++){
+ for(int y = 0; y < outgoingEdgeList.getCountOfPosition(); y++){
+ incomingEdge.set(incomingEdgeList.getPosition(x));
+ outgoingEdge.set(outgoingEdgeList.getPosition(y));
+ /** set neighborEdge readId intersection **/
+ setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
if(!neighborEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
+ if(count == 0)
+ createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+ else
+ createdVertexId.setByRead("GGG".getBytes(), 0);
+ count++;
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
- }
-
- if(!incomingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertexSet.add(createdVertex);
+ /** create new/created vertex **/
+ createNewVertex(i, incomingEdge, outgoingEdge);
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- }
-
- if(!outgoingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
+ /** send msg to neighbors to update their edges to new vertex **/
+ sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
+ /** store deleted edge **/
+ storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
}
}
}
+
+// for(KmerBytesWritable incomingEdge : incomingEdgeList){
+// for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
+// /** set neighborEdge readId intersection **/
+// setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
+//
+// if(!neighborEdgeIntersection.isEmpty()){
+// if(count == 0)
+// createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+// else
+// createdVertexId.setByRead("GGG".getBytes(), 0);
+// count++;
+//
+// /** create new/created vertex **/
+// createNewVertex(i, incomingEdge, outgoingEdge);
+//
+// /** send msg to neighbors to update their edges to new vertex **/
+// sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
+//
+// /** store deleted edge **/
+// storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
+// }
+// }
+// }
}
+ /** delete extra edges from old vertex **/
+ for(DeletedEdge deletedEdge : deletedEdges){
+ deleteEdgeFromOldVertex(deletedEdge);
+ }
+
+ /** Old vertex delete or voteToHalt **/
+ if(getVertexValue().getDegree() == 0)//if no any edge, delete
+ deleteVertex(getVertexId());
+ else
+ voteToHalt();
} else if(getSuperstep() == 4){
while(msgIterator.hasNext()){
incomingMsg = msgIterator.next();
/** update edgelist to new/created vertex **/
- byte meToNeighborDir = incomingMsg.getFlag();
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- switch(neighborToMeDir){
- case MessageFlag.DIR_FF:
- getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_FR:
- getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RF:
- getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RR:
- getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
- break;
- }
- /** add new/created vertex **/
- for(CreatedVertex v : createdVertexSet){
- Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
- VertexValueWritable vertexValue = new VertexValueWritable();
- switch(v.incomingDir){
- case "RF":
- vertexValue.getRFList().append(v.incomingEdge);
- break;
- case "RR":
- vertexValue.getRRList().append(v.incomingEdge);
- break;
- }
- switch(v.outgoingDir){
- case "FF":
- vertexValue.getFFList().append(v.outgoingEdge);
- break;
- case "FR":
- vertexValue.getFRList().append(v.outgoingEdge);
- break;
- }
- vertex.setVertexId(v.getCreatedVertexId());
- vertex.setVertexValue(vertexValue);
- }
- createdVertexSet.clear();
+ updateEdgeListPointToNewVertex();
}
+ voteToHalt();
}
}
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(SplitRepeatVertex.class.getSimpleName());
+ job.setVertexClass(SplitRepeatVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 519ffb9..7b695dc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -4,11 +4,10 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
@@ -47,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class TipAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "TipAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -58,37 +57,39 @@
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
}
-
+
+ /**
+ * create a new vertex point to split node
+ */
+
@SuppressWarnings("unchecked")
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4){
- getVertexValue().getFFList().append(2, (byte)1);
+ if(getVertexId().toString().equals("CTA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("AGC".getBytes(), 0);
+ getVertexValue().getRFList().append(vertexId);
//add tip vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
- VertexValueWritable vertexValue = new VertexValueWritable();
+
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
/**
* set the src vertex id
*/
- vertexId.set(2, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'G', 'A', 'A'};
- KmerBytesWritable kmer = new KmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)4));
- vertexValue.setRRList(plist);
+ KmerListWritable kmerList = new KmerListWritable(kmerSize);
+ kmerList.append(getVertexId());
+ vertexValue.setRFList(kmerList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
@@ -106,7 +107,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index b4f2407..c8d3e2d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -10,8 +10,8 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -57,7 +57,14 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if(length == -1)
length = getContext().getConfiguration().getInt(LENGTH, kmerSize); //kmerSize + 5
- outgoingMsg.reset();
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
}
@Override
@@ -66,35 +73,27 @@
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getFFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- else if(getVertexValue().getFRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+
+ sendSettledMsgToPreviousNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getRFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- else if(getVertexValue().getRRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getPreDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+ sendSettledMsgToNextNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isSingleVertex(getVertexValue())){
- if(getVertexValue().getLengthOfKmer() > length)
+ if(getVertexValue().getLengthOfKmer() <= length)
deleteVertex(getVertexId());
}
}
else if(getSuperstep() == 2){
- responseToDeadVertex(msgIterator);
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
deleted file mode 100644
index 13d9d98..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package edu.uci.ics.genomix.pregelix.sequencefile;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-
-
-public class ConvertNodeToIdValue {
-
- public static void convert(Path inFile, Path outFile)
- throws IOException {
- Configuration conf = new Configuration();
- FileSystem fileSys = FileSystem.get(conf);
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, PositionWritable.class,
- VertexValueWritable.class, CompressionType.NONE);
- NodeWritable node = new NodeWritable();
- NullWritable value = NullWritable.get();
- PositionWritable outputKey = new PositionWritable();
- VertexValueWritable outputValue = new VertexValueWritable();
-
- while(reader.next(node, value)) {
-// System.out.println(node.getNodeID().toString());
-// outputKey.set(node.getNodeID());
- outputValue.setFFList(node.getFFList());
- outputValue.setFRList(node.getFRList());
- outputValue.setRFList(node.getRFList());
- outputValue.setRRList(node.getRRList());
- outputValue.setKmer(node.getKmer());
- outputValue.setState(State.IS_HEAD);
- writer.append(outputKey, outputValue);
- }
- writer.close();
- reader.close();
- }
-
- public static void main(String[] args) throws IOException {
- Path dir = new Path("data/test");
- Path outDir = new Path("data/input");
- FileUtils.cleanDirectory(new File("data/input"));
- Path inFile = new Path(dir, "result.graphbuild.txt.bin");
- Path outFile = new Path(outDir, "out");
- convert(inFile,outFile);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 73ab533..4b32a51 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -3,7 +3,7 @@
public class CheckMessage {
public static final byte SOURCE = 1 << 0;
- public static final byte CHAIN = 1 << 1;
+ public static final byte ACUTUALKMER = 1 << 1;
public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
public static final byte NODEIDLIST = 1 << 4;
@@ -18,8 +18,8 @@
case SOURCE:
r = "SOURCE";
break;
- case CHAIN:
- r = "CHAIN";
+ case ACUTUALKMER:
+ r = "ACUTUALKMER";
break;
case NEIGHBER:
r = "NEIGHBER";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 2ed36e9..51a73f1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -11,5 +11,5 @@
public static final byte DIR_MASK = 0b111 << 0;
public static final byte DIR_CLEAR = 0b1111000 << 0;
- public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
+// public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index b7f6a4f..5a0591d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,6 +1,5 @@
package edu.uci.ics.genomix.pregelix.util;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 630b5aa..ff5f404 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -128,90 +128,90 @@
private static void genSplitRepeatGraph() throws IOException {
generateSplitRepeatGraphJob("SplitRepeatGraph", outputBase + "SplitRepeatGraph.xml");
}
-// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(TipAddVertex.class);
-// job.setVertexInputFormatClass(GraphCleanOutputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genTipAddGraph() throws IOException {
-// generateTipAddGraphJob("TipAddGraph", outputBase
-// + "TipAddGraph.xml");
-// }
-//
-// private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(TipRemoveVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genTipRemoveGraph() throws IOException {
-// generateTipRemoveGraphJob("TipRemoveGraph", outputBase
-// + "TipRemoveGraph.xml");
-// }
-//
-// private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BridgeAddVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBridgeAddGraph() throws IOException {
-// generateBridgeAddGraphJob("BridgeAddGraph", outputBase
-// + "BridgeAddGraph.xml");
-// }
-//
-// private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BridgeRemoveVertex.class);
-// job.setVertexInputFormatClass(GraphCleanInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBridgeRemoveGraph() throws IOException {
-// generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
-// + "BridgeRemoveGraph.xml");
-// }
-//
-// private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BubbleAddVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBubbleAddGraph() throws IOException {
-// generateBubbleAddGraphJob("BubbleAddGraph", outputBase
-// + "BubbleAddGraph.xml");
-// }
+ private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(TipAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genTipAddGraph() throws IOException {
+ generateTipAddGraphJob("TipAddGraph", outputBase
+ + "TipAddGraph.xml");
+ }
+
+ private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(TipRemoveVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genTipRemoveGraph() throws IOException {
+ generateTipRemoveGraphJob("TipRemoveGraph", outputBase
+ + "TipRemoveGraph.xml");
+ }
+
+ private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeAddGraph() throws IOException {
+ generateBridgeAddGraphJob("BridgeAddGraph", outputBase
+ + "BridgeAddGraph.xml");
+ }
+
+ private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeRemoveVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeRemoveGraph() throws IOException {
+ generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
+ + "BridgeRemoveGraph.xml");
+ }
+
+ private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BubbleAddVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBubbleAddGraph() throws IOException {
+ generateBubbleAddGraphJob("BubbleAddGraph", outputBase
+ + "BubbleAddGraph.xml");
+ }
//
// private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -243,6 +243,11 @@
// genP4ForMergeGraph();
// genMapReduceGraph();
genSplitRepeatGraph();
+ genTipAddGraph();
+ genBridgeAddGraph();
+ genTipRemoveGraph();
+ genBridgeRemoveGraph();
+ genBubbleAddGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
index 221a76a..c4f0963 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeAddSmallTestSuite.java
@@ -43,20 +43,9 @@
public class BridgeAddSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BridgeAddSmallTestSuite.class.getName());
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/AddBridge";
public static final String[] TestDir = { PreFix + File.separator
- + "tworeads"};/*, PreFix + File.separator
- /*+ "CyclePath"};, PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "TreePath"};*/
- /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
- PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
- PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
- PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
- PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
- PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
- PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
+ + "SimpleTest"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bridgeadd";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
index 397c514..171a6ca 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
@@ -43,9 +43,9 @@
public class BridgeRemoveSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BridgeRemoveSmallTestSuite.class.getName());
- public static final String PreFix = "data/actual"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/bridgeadd/BridgeAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "pathmerge/P4ForMergeGraph/bin/tworeads"};
+ + "SimpleTest"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bridgeremove";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
new file mode 100644
index 0000000..a9c2774
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleAddSmallTestSuite.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class BubbleAddSmallTestSuite extends TestSuite {
+ private static final Logger LOGGER = Logger.getLogger(BubbleAddSmallTestSuite.class.getName());
+
+ public static final String PreFix = "data/PathMergeTestSet";
+ public static final String[] TestDir = { PreFix + File.separator
+ + "5"};
+ private static final String ACTUAL_RESULT_DIR = "data/actual/bubbleadd";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only_bubbleadd.txt";
+
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles()
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()) {
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ BubbleAddSmallTestSuite testSuite = new BubbleAddSmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
+
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "bin" + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+ + File.separator + "txt" + File.separator + testDir.getName();
+ testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(), dfs,
+ HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
+
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
+
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
index 21ffe34..1948acd 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
@@ -45,7 +45,7 @@
//P4ForMergeGraph/bin/read
public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "SimpleTest"};
+ + "AdjSplitRepeat"};
private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
@@ -59,7 +59,7 @@
private MiniDFSCluster dfsCluster;
private JobConf conf = new JobConf();
- private int numberOfNC = 2;
+ private int numberOfNC = 1;
public void setUp() throws Exception {
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
index f4456ce..57f4ea5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipAddSmallTestSuite.java
@@ -43,20 +43,9 @@
public class TipAddSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(TipAddSmallTestSuite.class.getName());
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/PathMergeTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "read"};/*, PreFix + File.separator
- /*+ "CyclePath"};, PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "TreePath"};*/
- /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
- PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
- PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
- PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
- PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
- PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
- PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
+ + "5"};
private static final String ACTUAL_RESULT_DIR = "data/actual/tipadd";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
index def24e4..e8ca43f 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveSmallTestSuite.java
@@ -43,10 +43,9 @@
public class TipRemoveSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(TipRemoveSmallTestSuite.class.getName());
//P4ForMergeGraph/bin/read
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/tipadd/TipAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "graphs/tipremove/fr_with_tip"};
- //+ "bridgeadd/BridgeAddGraph/bin/tworeads"};
+ + "5"};
private static final String ACTUAL_RESULT_DIR = "data/actual/tipremove";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";