Merge commit '50e9f8614b4124288f606d90c3417caa2b5f80de' into nanzhang/hyracks_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
new file mode 100644
index 0000000..f7caebb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -0,0 +1,260 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+/**
+ * A list of fixed-length kmers. The length of this list is stored internally.
+ */
+public class KmerListWritable implements Writable, Iterable<KmerBytesWritable>, Serializable {
+ private static final long serialVersionUID = 1L;
+ protected static final byte[] EMPTY_BYTES = { 0, 0, 0, 0 };
+ protected static final int HEADER_SIZE = 4;
+
+ protected byte[] storage;
+ protected int offset;
+ protected int valueCount;
+ protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
+
+ private KmerBytesWritable posIter = new KmerBytesWritable();
+
+ public KmerListWritable() {
+ storage = EMPTY_BYTES;
+ valueCount = 0;
+ offset = 0;
+ storageMaxSize = storage.length;
+ }
+
+ public KmerListWritable(byte[] data, int offset) {
+ setNewReference(data, offset);
+ }
+
+ public KmerListWritable(List<KmerBytesWritable> kmers) {
+ this();
+ setSize(kmers.size() * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE); // reserve space for all elements
+ for (KmerBytesWritable kmer : kmers) {
+ append(kmer);
+ }
+ }
+
+ public void setNewReference(byte[] data, int offset) {
+ valueCount = Marshal.getInt(data, offset);
+ if (valueCount * KmerBytesWritable.getBytesPerKmer() > data.length - offset) {
+ throw new IllegalArgumentException("Specified data buffer (len=" + (data.length - offset)
+ + ") is not large enough to store requested number of elements (" + valueCount + ")!");
+ }
+ this.storage = data;
+ this.offset = offset;
+ this.storageMaxSize = valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
+ }
+
+ public void append(KmerBytesWritable kmer) {
+ setSize((1 + valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ System.arraycopy(kmer.getBytes(), kmer.offset, storage,
+ offset + HEADER_SIZE + valueCount * KmerBytesWritable.getBytesPerKmer(),
+ KmerBytesWritable.getBytesPerKmer());
+ valueCount += 1;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ /*
+ * Append the otherList to the end of myList
+ */
+ public void appendList(KmerListWritable otherList) {
+ if (otherList.valueCount > 0) {
+ setSize((valueCount + otherList.valueCount) * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ // copy contents of otherList into the end of my storage
+ System.arraycopy(otherList.storage, otherList.offset + HEADER_SIZE, storage, offset + HEADER_SIZE
+ + valueCount * KmerBytesWritable.getBytesPerKmer(),
+ otherList.valueCount * KmerBytesWritable.getBytesPerKmer());
+ valueCount += otherList.valueCount;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+ }
+
+ /**
+ * Save the union of my list and otherList. Uses a temporary HashSet for
+ * uniquefication
+ */
+ public void unionUpdate(KmerListWritable otherList) {
+ int newSize = valueCount + otherList.valueCount;
+ HashSet<KmerBytesWritable> uniqueElements = new HashSet<KmerBytesWritable>(newSize);
+ for (KmerBytesWritable kmer : this) {
+ uniqueElements.add(kmer);
+ }
+ for (KmerBytesWritable kmer : otherList) {
+ uniqueElements.add(kmer);
+ }
+ valueCount = 0;
+ setSize(newSize * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ for (KmerBytesWritable kmer : uniqueElements) { // this point is not efficient
+ append(kmer);
+ }
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ }
+
+ protected int getCapacity() {
+ return storageMaxSize - offset;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap > getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (valueCount > 0) {
+ System.arraycopy(storage, offset, new_data, 0, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ }
+ storage = new_data;
+ offset = 0;
+ storageMaxSize = storage.length;
+ }
+ }
+
+ public void reset() {
+ valueCount = 0;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ public KmerBytesWritable getPosition(int i) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ posIter.setAsReference(storage, offset + HEADER_SIZE + i * KmerBytesWritable.getBytesPerKmer());
+ return posIter;
+ }
+
+ public void setCopy(KmerListWritable otherList) {
+ setCopy(otherList.storage, otherList.offset);
+ }
+
+ /**
+ * read a KmerListWritable from newData, which should include the header
+ */
+ public void setCopy(byte[] newData, int offset) {
+ int newValueCount = Marshal.getInt(newData, offset);
+ setSize(newValueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ if (newValueCount > 0) {
+ System.arraycopy(newData, offset + HEADER_SIZE, storage, this.offset + HEADER_SIZE, newValueCount
+ * KmerBytesWritable.getBytesPerKmer());
+ }
+ valueCount = newValueCount;
+ Marshal.putInt(valueCount, storage, this.offset);
+ }
+
+ @Override
+ public Iterator<KmerBytesWritable> iterator() {
+ Iterator<KmerBytesWritable> it = new Iterator<KmerBytesWritable>() {
+
+ private int currentIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < valueCount;
+ }
+
+ @Override
+ public KmerBytesWritable next() {
+ return getPosition(currentIndex++);
+ }
+
+ @Override
+ public void remove() {
+ if (currentIndex < valueCount)
+ System.arraycopy(storage, offset + currentIndex * KmerBytesWritable.getBytesPerKmer(), storage,
+ offset + (currentIndex - 1) * KmerBytesWritable.getBytesPerKmer(),
+ (valueCount - currentIndex) * KmerBytesWritable.getBytesPerKmer());
+ valueCount--;
+ currentIndex--;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+ };
+ return it;
+ }
+
+ /*
+ * remove the first instance of `toRemove`. Uses a linear scan. Throws an
+ * exception if not in this list.
+ */
+ public void remove(KmerBytesWritable toRemove, boolean ignoreMissing) {
+ Iterator<KmerBytesWritable> posIterator = this.iterator();
+ while (posIterator.hasNext()) {
+ if (toRemove.equals(posIterator.next())) {
+ posIterator.remove();
+ return; // break as soon as the element is found
+ }
+ }
+ // element was not found
+ if (!ignoreMissing) {
+ throw new ArrayIndexOutOfBoundsException("the KmerBytesWritable `" + toRemove.toString()
+ + "` was not found in this list.");
+ }
+ }
+
+ public void remove(KmerBytesWritable toRemove) {
+ remove(toRemove, false);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ valueCount = in.readInt();
+ setSize(valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ in.readFully(storage, offset + HEADER_SIZE, valueCount * KmerBytesWritable.getBytesPerKmer() - HEADER_SIZE);
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(storage, offset, valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE);
+ }
+
+ public int getCountOfPosition() {
+ return valueCount;
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return valueCount * KmerBytesWritable.getBytesPerKmer() + HEADER_SIZE;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('[');
+ for (int i = 0; i < valueCount; i++) {
+ sbuilder.append(getPosition(i).toString());
+ sbuilder.append(',');
+ }
+ if (valueCount > 0) {
+ sbuilder.setCharAt(sbuilder.length() - 1, ']');
+ } else {
+ sbuilder.append(']');
+ }
+ return sbuilder.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index d61f679..34dfefe 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -71,7 +71,6 @@
this.kmer.reset(0);
}
-
public PositionListWritable getNodeIdList() {
return nodeIdList;
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index 4f34542..11b0f12 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -155,6 +155,12 @@
System.arraycopy(newData, offset + HEADER_SIZE, bytes, this.kmerStartOffset, bytesUsed);
}
+ public void setAsCopy(int k, byte[] newData, int offset) {
+// int k = Marshal.getInt(newData, offset);
+ reset(k);
+ System.arraycopy(newData, offset, bytes, this.kmerStartOffset, bytesUsed);
+ }
+
/**
* Point this datablock to the given bytes array It works like the pointer
* to new datablock.
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
index aa33350..f269b5a 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerListWritable.java
@@ -24,7 +24,7 @@
protected byte[] storage;
protected int offset;
protected int valueCount;
- protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
+ protected int storageMaxSize; // since we may be a reference inside a larger datablock, we must track our maximum size
private VKmerBytesWritable posIter = new VKmerBytesWritable();
@@ -32,7 +32,7 @@
storage = EMPTY_BYTES;
valueCount = 0;
offset = 0;
- storageMaxSize = storage.length;
+ storageMaxSize = storage.length;
}
public VKmerListWritable(byte[] data, int offset) {
@@ -55,8 +55,16 @@
public void append(VKmerBytesWritable kmer) {
setSize(getLength() + kmer.getLength());
- System.arraycopy(kmer.getBytes(), kmer.kmerStartOffset - VKmerBytesWritable.HEADER_SIZE,
- storage, offset + getLength(),
+ System.arraycopy(kmer.getBytes(), kmer.kmerStartOffset - VKmerBytesWritable.HEADER_SIZE, storage, offset
+ + getLength(), kmer.getLength());
+ valueCount += 1;
+ Marshal.putInt(valueCount, storage, offset);
+ }
+
+ public void append(int k, KmerBytesWritable kmer) {
+ setSize(getLength() + HEADER_SIZE + kmer.getLength());
+ Marshal.putInt(k, storage, offset + getLength());
+ System.arraycopy(kmer.getBytes(), kmer.getOffset(), storage, offset + getLength() + HEADER_SIZE,
kmer.getLength());
valueCount += 1;
Marshal.putInt(valueCount, storage, offset);
@@ -79,7 +87,7 @@
public void appendList(VKmerListWritable otherList) {
if (otherList.valueCount > 0) {
setSize(getLength() + otherList.getLength() - HEADER_SIZE); // one of the headers is redundant
-
+
// copy contents of otherList into the end of my storage
System.arraycopy(otherList.storage, otherList.offset + HEADER_SIZE, // skip other header
storage, offset + getLength(), // add to end
@@ -103,7 +111,7 @@
for (VKmerBytesWritable kmer : otherList) {
uniqueElements.add(kmer); // references okay
}
- setSize(getLength() + otherList.getLength()); // upper bound on memory usage
+ setSize(getLength() + otherList.getLength()); // upper bound on memory usage
valueCount = 0;
for (VKmerBytesWritable kmer : uniqueElements) {
append(kmer);
@@ -142,9 +150,9 @@
posIter.setAsReference(storage, getOffsetOfKmer(i));
return posIter;
}
-
+
/**
- * Return the offset of the kmer at the i'th position
+ * Return the offset of the kmer at the i'th position
*/
public int getOffsetOfKmer(int i) {
if (i >= valueCount) {
@@ -170,9 +178,8 @@
int newLength = getLength(newData, newOffset);
setSize(newLength);
if (newValueCount > 0) {
- System.arraycopy(newData, newOffset + HEADER_SIZE,
- storage, this.offset + HEADER_SIZE,
- newLength - HEADER_SIZE);
+ System.arraycopy(newData, newOffset + HEADER_SIZE, storage, this.offset + HEADER_SIZE, newLength
+ - HEADER_SIZE);
}
valueCount = newValueCount;
Marshal.putInt(valueCount, storage, this.offset);
@@ -193,7 +200,8 @@
@Override
public VKmerBytesWritable next() {
posIter.setAsReference(storage, currentOffset);
- currentOffset += KmerUtil.getByteNumFromK(Marshal.getInt(storage, currentOffset)) + VKmerBytesWritable.HEADER_SIZE;
+ currentOffset += KmerUtil.getByteNumFromK(Marshal.getInt(storage, currentOffset))
+ + VKmerBytesWritable.HEADER_SIZE;
currentIndex++;
return posIter;
}
@@ -201,16 +209,17 @@
@Override
public void remove() {
if (currentOffset <= 0) {
- throw new IllegalStateException("You must advance the iterator using .next() before calling remove()!");
+ throw new IllegalStateException(
+ "You must advance the iterator using .next() before calling remove()!");
}
// we're removing the element at prevIndex
int prevIndex = currentIndex - 1;
int prevOffset = getOffsetOfKmer(prevIndex);
-
+
if (currentIndex < valueCount) { // if it's the last element, don't have to do any copying
System.arraycopy(storage, currentOffset, // from the "next" element
storage, prevOffset, // to the one just returned (overwriting it)
- getLength() - currentOffset + offset); // remaining bytes except current element
+ getLength() - currentOffset + offset); // remaining bytes except current element
}
valueCount--;
currentIndex--;
@@ -230,7 +239,7 @@
while (posIterator.hasNext()) {
if (toRemove.equals(posIterator.next())) {
posIterator.remove();
- return; // break as soon as the element is found
+ return; // break as soon as the element is found
}
}
// element was not found
@@ -258,7 +267,8 @@
elemBytes = KmerUtil.getByteNumFromK(elemLetters) + VKmerBytesWritable.HEADER_SIZE;
setSize(curLength + elemBytes); // make sure we have room for the new element
Marshal.putInt(elemLetters, storage, curOffset); // write header
- in.readFully(storage, curOffset + VKmerBytesWritable.HEADER_SIZE, elemBytes - VKmerBytesWritable.HEADER_SIZE); // write kmer
+ in.readFully(storage, curOffset + VKmerBytesWritable.HEADER_SIZE, elemBytes
+ - VKmerBytesWritable.HEADER_SIZE); // write kmer
curOffset += elemBytes;
curLength += elemBytes;
}
@@ -284,19 +294,21 @@
public int getLength() {
int totalSize = HEADER_SIZE;
for (int curCount = 0; curCount < valueCount; curCount++) {
- totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(storage, offset + totalSize)) + VKmerBytesWritable.HEADER_SIZE;
+ totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(storage, offset + totalSize))
+ + VKmerBytesWritable.HEADER_SIZE;
}
return totalSize;
}
-
+
public static int getLength(byte[] listStorage, int listOffset) {
- int totalSize = HEADER_SIZE;
- int listValueCount = Marshal.getInt(listStorage, listOffset);
- for (int curCount = 0; curCount < listValueCount; curCount++) {
- totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(listStorage, listOffset + totalSize)) + VKmerBytesWritable.HEADER_SIZE;
- }
- return totalSize;
- }
+ int totalSize = HEADER_SIZE;
+ int listValueCount = Marshal.getInt(listStorage, listOffset);
+ for (int curCount = 0; curCount < listValueCount; curCount++) {
+ totalSize += KmerUtil.getByteNumFromK(Marshal.getInt(listStorage, listOffset + totalSize))
+ + VKmerBytesWritable.HEADER_SIZE;
+ }
+ return totalSize;
+ }
@Override
public String toString() {
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
new file mode 100644
index 0000000..5bcf663
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/NodeWritableTest.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.genomix.data.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class NodeWritableTest {
+
+ @Test
+ public void testNodeReset() throws IOException {
+
+ NodeWritable outputNode = new NodeWritable();
+ NodeWritable inputNode = new NodeWritable();
+
+ KmerListWritable nextKmerList = new KmerListWritable();
+ KmerListWritable preKmerList = new KmerListWritable();
+ KmerBytesWritable preKmer = new KmerBytesWritable();
+ KmerBytesWritable curKmer = new KmerBytesWritable();
+ KmerBytesWritable nextKmer = new KmerBytesWritable();
+ PositionWritable nodeId = new PositionWritable();
+ PositionListWritable nodeIdList = new PositionListWritable();
+ KmerBytesWritable.setGlobalKmerLength(5);
+
+ nodeId.set((byte)0, (long)1, 0);
+ nodeIdList.append(nodeId);
+ for (int i = 6; i <= 10; i++) {
+ NodeWritable tempNode = new NodeWritable();
+
+ String randomString = generaterRandomString(i);
+ byte[] array = randomString.getBytes();
+
+ curKmer.setByRead(array, 0);
+ preKmer.setAsCopy(curKmer);
+ nextKmer.setAsCopy(curKmer);
+ nextKmer.shiftKmerWithNextChar(array[5]);
+
+ nextKmerList.append(nextKmer);
+
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setFFList(nextKmerList);
+
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setFFList(nextKmerList);
+
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+
+ int j = 5;
+ for (; j < array.length - 1; j++) {
+ outputNode.reset();
+ curKmer.setAsCopy(nextKmer);
+
+ nextKmer.shiftKmerWithNextChar(array[j+1]);
+ nextKmerList.reset();
+ nextKmerList.append(nextKmer);
+ preKmerList.reset();
+ preKmerList.append(preKmer);
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setFFList(nextKmerList);
+ outputNode.setRRList(preKmerList);
+ tempNode.reset();
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setFFList(nextKmerList);
+ tempNode.setRRList(preKmerList);
+ preKmer.setAsCopy(curKmer);
+ inputNode.reset();
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+ }
+ curKmer.setAsCopy(nextKmer);
+ preKmerList.reset();
+ preKmerList.append(preKmer);
+ outputNode.reset();
+ outputNode.setNodeIdList(nodeIdList);
+ outputNode.setRRList(preKmerList);
+ tempNode.reset();
+ tempNode.setNodeIdList(nodeIdList);
+ tempNode.setRRList(preKmerList);
+ inputNode.reset();
+ inputNode.setAsReference(outputNode.marshalToByteArray(), 0);
+ Assert.assertEquals(tempNode.toString(), inputNode.toString());
+ }
+ }
+
+ public String generaterRandomString(int n) {
+ char[] chars = "ACGT".toCharArray();
+ StringBuilder sb = new StringBuilder();
+ Random random = new Random();
+ for (int i = 0; i < n; i++) {
+ char c = chars[random.nextInt(chars.length)];
+ sb.append(c);
+ }
+ return sb.toString();
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 42d8db6..c77fa70 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -18,14 +18,12 @@
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -47,24 +45,23 @@
public static final int OutputKmerField = 0;
public static final int OutputNodeField = 1;
-
private final int readLength;
private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null});
+ null });
public ReadsKeyValueParserFactory(int readlength, int k) {
this.readLength = readlength;
this.kmerSize = k;
}
-
- public static enum KmerDir {
+
+ public enum KmerDir {
FORWARD,
REVERSE,
}
-
+
@Override
public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
@@ -73,27 +70,22 @@
outputAppender.reset(outputBuffer, true);
KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new IKeyValueParser<LongWritable, Text>() {
-
+
private PositionWritable nodeId = new PositionWritable();
private PositionListWritable nodeIdList = new PositionListWritable();
- private VKmerListWritable edgeListForPreKmer = new VKmerListWritable();
- private VKmerListWritable edgeListForNextKmer = new VKmerListWritable();
- private NodeWritable outputNode = new NodeWritable();
-// private NodeWritable outputNode2 = new NodeWritable();
+ private NodeWritable curNode = new NodeWritable();
+ private NodeWritable nextNode = new NodeWritable();
- private KmerBytesWritable preForwardKmer = new KmerBytesWritable();
- private KmerBytesWritable preReverseKmer = new KmerBytesWritable();
private KmerBytesWritable curForwardKmer = new KmerBytesWritable();
private KmerBytesWritable curReverseKmer = new KmerBytesWritable();
private KmerBytesWritable nextForwardKmer = new KmerBytesWritable();
private KmerBytesWritable nextReverseKmer = new KmerBytesWritable();
-
- private KmerDir preKmerDir = KmerDir.FORWARD;
+
private KmerDir curKmerDir = KmerDir.FORWARD;
private KmerDir nextKmerDir = KmerDir.FORWARD;
byte mateId = (byte) 0;
-
+
@Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
@@ -125,145 +117,87 @@
if (kmerSize >= array.length) {
return;
}
- outputNode.reset();
+ curNode.reset();
+ nextNode.reset();
curForwardKmer.setByRead(array, 0);
curReverseKmer.setByReadReverse(array, 0);
curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
- setNextKmer(array[kmerSize]);
- setnodeId(mateId, readID, 0);
- setEdgeListForNextKmer();
- writeToFrame(writer);
+ nextForwardKmer.setAsCopy(curForwardKmer);
+ setNextKmer(nextForwardKmer, nextReverseKmer, nextKmerDir, array[kmerSize]);
+ setnodeId(curNode, mateId, readID, 0);
+ setnodeId(nextNode, mateId, readID, 0);
+ setEdgeListForCurAndNextKmer(curKmerDir, curNode, nextKmerDir, nextNode);
+ writeToFrame(curForwardKmer, curReverseKmer, curKmerDir, curNode, writer);
/*middle kmer*/
- int i = kmerSize;
- for (; i < array.length - 1; i++) {
- outputNode.reset();
- setPreKmerByOldCurKmer();
- setCurKmerByOldNextKmer();
- setNextKmer(array[i]);
- setnodeId(mateId, readID, 0);//i - kmerSize + 1
- setEdgeListForPreKmer();
- setEdgeListForNextKmer();
- writeToFrame(writer);
+ int i = kmerSize + 1;
+ for (; i < array.length; i++) {
+ curForwardKmer.setAsCopy(nextForwardKmer);
+ curReverseKmer.setAsCopy(nextReverseKmer);
+ curNode.set(nextNode);
+ nextNode.reset();
+ setNextKmer(nextForwardKmer, nextReverseKmer, nextKmerDir, array[kmerSize]);
+ setnodeId(nextNode, mateId, readID, 0);
+ setEdgeListForCurAndNextKmer(curKmerDir, curNode, nextKmerDir, nextNode);
+ writeToFrame(curForwardKmer, curReverseKmer, curKmerDir, curNode, writer);
}
-
+
/*last kmer*/
- outputNode.reset();
- setPreKmerByOldCurKmer();
- setCurKmerByOldNextKmer();
- setnodeId(mateId, readID, 0);//array.length - kmerSize + 1
- setEdgeListForPreKmer();
- writeToFrame(writer);
+ writeToFrame(nextForwardKmer, nextReverseKmer, nextKmerDir, nextNode, writer);
}
-
- public void setnodeId(byte mateId, long readID, int posId){
+
+ public void setnodeId(NodeWritable node, byte mateId, long readID, int posId) {
nodeId.set(mateId, readID, posId);
nodeIdList.reset();
nodeIdList.append(nodeId);
- outputNode.setNodeIdList(nodeIdList);
- }
-
- public void setNextKmer(byte nextChar){
- nextForwardKmer.setAsCopy(curForwardKmer);
- nextForwardKmer.shiftKmerWithNextChar(nextChar);
- nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
- nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
- }
-
- public void setPreKmerByOldCurKmer(){
- preKmerDir = curKmerDir;
- preForwardKmer.setAsCopy(curForwardKmer);
- preReverseKmer.setAsCopy(curReverseKmer);
+ node.setNodeIdList(nodeIdList);
}
- public void setCurKmerByOldNextKmer(){
- curKmerDir = nextKmerDir;
- curForwardKmer.setAsCopy(nextForwardKmer);
- curReverseKmer.setAsCopy(nextReverseKmer);
+ public void setNextKmer(KmerBytesWritable forwardKmer, KmerBytesWritable ReverseKmer, KmerDir nextKmerDir,
+ byte nextChar) {
+ forwardKmer.shiftKmerWithNextChar(nextChar);
+ ReverseKmer.setByReadReverse(forwardKmer.toString().getBytes(), forwardKmer.getOffset());
+ nextKmerDir = forwardKmer.compareTo(ReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
}
-
- public void writeToFrame(IFrameWriter writer) {
- switch(curKmerDir){
+
+ public void writeToFrame(KmerBytesWritable forwardKmer, KmerBytesWritable reverseKmer, KmerDir curKmerDir,
+ NodeWritable node, IFrameWriter writer) {
+ switch (curKmerDir) {
case FORWARD:
- InsertToFrame(curForwardKmer, outputNode, writer);
+ InsertToFrame(forwardKmer, node, writer);
break;
case REVERSE:
- InsertToFrame(curReverseKmer, outputNode, writer);
+ InsertToFrame(forwardKmer, node, writer);
break;
}
}
- public void setEdgeListForPreKmer(){
- switch(curKmerDir){
- case FORWARD:
- switch(preKmerDir){
- case FORWARD:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preForwardKmer);
- outputNode.setRRList(edgeListForPreKmer);
- break;
- case REVERSE:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preReverseKmer);
- outputNode.setRFList(edgeListForPreKmer);
- break;
- }
- break;
- case REVERSE:
- switch(preKmerDir){
- case FORWARD:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preForwardKmer);
- outputNode.setFRList(edgeListForPreKmer);
- break;
- case REVERSE:
- edgeListForPreKmer.reset();
- edgeListForPreKmer.append(preReverseKmer);
- outputNode.setFFList(edgeListForPreKmer);
- break;
- }
- break;
+
+ public void setEdgeListForCurAndNextKmer(KmerDir curKmerDir, NodeWritable curNode, KmerDir nextKmerDir,
+ NodeWritable nextNode) {
+ if (curKmerDir == KmerDir.FORWARD && nextKmerDir == KmerDir.FORWARD) {
+ curNode.getFFList().append(kmerSize, nextForwardKmer);
+ nextNode.getRRList().append(kmerSize, curForwardKmer);
+ }
+ if (curKmerDir == KmerDir.FORWARD && nextKmerDir == KmerDir.REVERSE) {
+ curNode.getFRList().append(kmerSize, nextReverseKmer);
+ nextNode.getFRList().append(kmerSize, curForwardKmer);
+ }
+ if (curKmerDir == KmerDir.REVERSE && nextKmerDir == KmerDir.FORWARD) {
+ curNode.getRFList().append(kmerSize, nextForwardKmer);
+ nextNode.getRFList().append(kmerSize, curReverseKmer);
+ }
+ if (curKmerDir == KmerDir.REVERSE && nextKmerDir == KmerDir.REVERSE) {
+ curNode.getRRList().append(kmerSize, nextReverseKmer);
+ nextNode.getFFList().append(kmerSize, curReverseKmer);
}
}
-
- public void setEdgeListForNextKmer(){
- switch(curKmerDir){
- case FORWARD:
- switch(nextKmerDir){
- case FORWARD:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextForwardKmer);
- outputNode.setFFList(edgeListForNextKmer);
- break;
- case REVERSE:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextReverseKmer);
- outputNode.setFRList(edgeListForNextKmer);
- break;
- }
- break;
- case REVERSE:
- switch(nextKmerDir){
- case FORWARD:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextForwardKmer);
- outputNode.setRFList(edgeListForNextKmer);
- break;
- case REVERSE:
- edgeListForNextKmer.reset();
- edgeListForNextKmer.append(nextReverseKmer);
- outputNode.setRRList(edgeListForNextKmer);
- break;
- }
- break;
- }
- }
-
+
private void InsertToFrame(KmerBytesWritable kmer, NodeWritable node, IFrameWriter writer) {
try {
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
tupleBuilder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
-
+
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
FrameUtils.flushFrame(outputBuffer, writer);
@@ -289,5 +223,4 @@
}
};
}
-
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 46fdd0e..a484179 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -48,6 +48,7 @@
return new IAggregatorDescriptor() {
private NodeWritable readNode = new NodeWritable();
+// private KmerBytesWritable readKeyKmer = new KmerBytesWritable();
protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -75,13 +76,14 @@
AggregateState state) throws HyracksDataException {
NodeWritable localUniNode = (NodeWritable) state.state;
localUniNode.reset();
+// readKeyKmer.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 0));
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
localUniNode.getFFList().appendList(readNode.getFFList());
localUniNode.getFRList().appendList(readNode.getFRList());
localUniNode.getRFList().appendList(readNode.getRFList());
localUniNode.getRRList().appendList(readNode.getRRList());
-
+
// make an empty field
tupleBuilder.addFieldEndOffset();// mark question?
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index 1ee6cae..aa3c9f6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -20,8 +20,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -70,6 +73,9 @@
localUniNode.reset();
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+// VKmerBytesWritable a = new VKmerBytesWritable();
+ // a.setAsCopy(readNode.getFFList().getPosition(0));
+ // int kRequested = Marshal.getInt(readNode.getFFList().getByteArray(), readNode.getFFList().getStartOffset() + 4);
localUniNode.getFFList().unionUpdate(readNode.getFFList());
localUniNode.getFRList().unionUpdate(readNode.getFRList());
localUniNode.getRFList().unionUpdate(readNode.getRFList());
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
new file mode 100644
index 0000000..e248c3b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/assembleKeyIntoNodeOperator.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class AssembleKeyIntoNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+ public AssembleKeyIntoNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = outRecDesc;
+ this.kmerSize = kmerSize;
+ KmerBytesWritable.setGlobalKmerLength(this.kmerSize);
+ }
+
+ private static final long serialVersionUID = 1L;
+ private final int kmerSize;
+
+ public static final int InputKmerField = 0;
+ public static final int InputtempNodeField = 1;
+ public static final int OutputNodeField = 0;
+
+ public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[1]);
+
+ public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ public static final int INT_LENGTH = 4;
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
+
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
+
+ NodeWritable readNode;
+ KmerBytesWritable readKmer;
+
+ public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+
+ readNode = new NodeWritable();
+ readKmer = new KmerBytesWritable();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ generateNodeFromKmer(i);
+ }
+ }
+
+ private void generateNodeFromKmer(int tIndex) throws HyracksDataException {
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+
+ setKmer(readKmer, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputKmerField));
+ readNode.reset();
+ setNode(readNode, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputtempNodeField));
+ readNode.getKmer().setAsCopy(readKmer.getKmerLength(), readKmer.getBytes(), readKmer.getOffset());
+ outputNode(readNode);
+ }
+
+
+ private void setKmer(KmerBytesWritable kmer, int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ kmer.setAsCopy(buffer.array(), offset);
+ }
+
+ private void setNode(NodeWritable node, int offset) {
+ ByteBuffer buffer = accessor.getBuffer();
+ node.setAsCopy(buffer.array(), offset);
+ }
+
+
+ private void outputNode(NodeWritable node) throws HyracksDataException {
+
+ try {
+ builder.reset();
+ builder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
+
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ throw new IllegalStateException("Failed to append tuplebuilder to frame");
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+ recordDescriptors[0]);
+ }
+
+}
+
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
index b7b7054..fa6ae9b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -17,7 +17,7 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.AssembleKeyIntoNodeOperator;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -33,8 +33,7 @@
*/
private static final long serialVersionUID = 1L;
private final int kmerSize;
- public static final int OutputKmerField = ReadsKeyValueParserFactory.OutputKmerField;
- public static final int outputNodeField = ReadsKeyValueParserFactory.OutputNodeField;
+ public static final int OutputNodeField = AssembleKeyIntoNodeOperator.OutputNodeField;
public NodeTextWriterFactory(int k) {
this.kmerSize = k;
@@ -53,9 +52,7 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- node.setAsReference(tuple.getFieldData(outputNodeField), tuple.getFieldStart(outputNodeField));
- node.getKmer().reset(kmerSize);
- node.getKmer().setAsReference(tuple.getFieldData(OutputKmerField), tuple.getFieldStart(OutputKmerField));
+ node.setAsReference(tuple.getFieldData(OutputNodeField), tuple.getFieldStart(OutputNodeField));
try {
output.write(node.toString().getBytes());
output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index afc1cf7..6a5dcc4 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -29,6 +29,7 @@
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.AssembleKeyIntoNodeOperator;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.io.NodeTextWriterFactory;
@@ -181,6 +182,16 @@
return kmerCrossAggregator;
}
+ public AbstractOperatorDescriptor generateKmerToFinalNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) {
+
+ AbstractOperatorDescriptor mapToFinalNode = new AssembleKeyIntoNodeOperator(jobSpec,
+ AssembleKeyIntoNodeOperator.nodeOutputRec, kmerSize);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapToFinalNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapToFinalNode;
+ }
+
public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
ITupleWriterFactory nodeWriter = null;
@@ -209,10 +220,15 @@
logDebug("Group by Kmer");
AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+ logDebug("Generate final node");
+ lastOperator = generateKmerToFinalNode(jobSpec, lastOperator);
+
+ jobSpec.addRoot(lastOperator);
+
logDebug("Write node to result");
lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
- jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
+ jobSpec.addRoot(lastOperator);
return jobSpec;
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index 25915aa..bf87b23 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -32,7 +32,7 @@
@SuppressWarnings("deprecation")
public class JobRun {
private static final int KmerSize = 5;
- private static final int ReadLength = 6;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
@@ -52,8 +52,8 @@
@Test
public void TestAll() throws Exception {
- TestReader();
-// TestGroupby();
+// TestReader();
+ TestGroupby();
}
public void TestReader() throws Exception {
@@ -68,7 +68,7 @@
cleanUpReEntry();
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-// dumpResult();
+ dumpResult();
}
@Before
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 51a0d15..fbbc89a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 5;
- private static final int ReadLength = 8;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test1.txt";
+ private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/test.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -75,11 +75,11 @@
@Test
public void TestAll() throws Exception {
-// TestReader();
+ TestReader();
// TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
- TestEndToEnd();
+// TestEndToEnd();
// TestUnMergedNode();
}
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
new file mode 100644
index 0000000..bb03d70
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/SplitRepeat.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CATAC
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
index 3f1cd5c..a720dc4 100644
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
@@ -1 +1 @@
-1 AATAGA
+1 AATAGAA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 83f286c..5e7a856 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -313,7 +313,7 @@
*/
public void processMerges(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
byte neighborToMergeDir, KmerBytesWritable nodeToAdd,
- int kmerSize, KmerBytesWritable kmer){
+ int kmerSize, VKmerBytesWritable kmer){
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete); //set(null);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 64965e3..fc0cd4b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -95,7 +95,7 @@
/**
* get destination vertex
*/
- public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
@@ -112,7 +112,7 @@
}
- public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public VKmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index 99cd3c2..c1b1dda 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -51,7 +51,7 @@
kmerList.reset();
if(fakeVertex == null){
// fakeVertex = new KmerBytesWritable(kmerSize + 1); // TODO check if merge is correct
- fakeVertex = new KmerBytesWritable();
+ fakeVertex = new VKmerBytesWritable();
String random = generaterRandomString(kmerSize + 1);
fakeVertex.setByRead(random.getBytes(), 0);
}
@@ -187,7 +187,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}