there is an issue in KmerListWritable and try to fix it
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
index 88bb79c..1d8d871 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -7,12 +7,14 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.genomix.data.KmerUtil;
import edu.uci.ics.genomix.data.Marshal;
-public class KmerListWritable implements Writable, Iterable<KmerBytesWritable>, Serializable{
+public class KmerListWritable extends BinaryComparable
+ implements Writable, Iterable<KmerBytesWritable>, Serializable{
private static final long serialVersionUID = 1L;
protected byte[] storage;
protected int offset;
@@ -212,6 +214,7 @@
return valueCount * kmerByteSize;
}
+
@Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
@@ -232,4 +235,10 @@
public int hashCode() {
return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
}
+
+ @Override
+ public byte[] getBytes() {
+
+ return null;
+ }
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index fbd458e..fa5725b 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -26,6 +26,7 @@
import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
public class KmerBytesWritableTest {
static byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
@@ -403,16 +404,26 @@
KmerBytesWritable k3 = new KmerBytesWritable(3);
k3.setByRead(("ATG").getBytes(), 0);
Set<Long> set3 = new HashSet<Long>();
- set3.add((long) 3);
+ set3.add((long) 2);
map.put(k3, set3);
KmerBytesWritable k4 = new KmerBytesWritable(3);
k4.setByRead(("AAT").getBytes(), 0);
Set<Long> set4 = new HashSet<Long>();
- set4.add((long) 4);
+ set4.add((long) 1);
map.put(k4, set4);
+ KmerListWritable kmerList = new KmerListWritable(3);
+ kmerList.append(k1);
+ kmerList.append(k2);
System.out.println("CTA = " + map.get(k1).toString());
System.out.println("GTA = " + map.get(k2).toString());
System.out.println("ATG = " + map.get(k3).toString());
System.out.println("AAT = " + map.get(k4).toString());
+ System.out.println(k1.compareTo(k2));
+ System.out.println(k2.compareTo(k1));
+
+ System.out.println("CTA = " + kmerList.getPosition(0).toString());
+ System.out.println("GTA = " + kmerList.getPosition(1).toString());
+ System.out.println("CTA = " + map.get(kmerList.getPosition(0)).toString());
+ System.out.println("GTA = " + map.get(kmerList.getPosition(1)).toString());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index ca91edb..2314245 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -29,7 +29,7 @@
protected MessageWritable incomingMsg = null;
protected MessageWritable outgoingMsg = null;
protected KmerBytesWritable destVertexId = new KmerBytesWritable();
- protected Iterator<KmerBytesWritable> posIterator;
+ protected Iterator<KmerBytesWritable> kmerIterator;
protected KmerBytesWritable tmpKmer = new KmerBytesWritable(kmerSize);
byte headFlag;
protected byte outFlag;
@@ -69,11 +69,11 @@
*/
public KmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
- return posIterator.next();
+ kmerIterator = value.getFFList().iterator();
+ return kmerIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
+ kmerIterator = value.getFRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
@@ -81,11 +81,11 @@
public KmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
- return posIterator.next();
+ kmerIterator = value.getRFList().iterator();
+ return kmerIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
+ kmerIterator = value.getRRList().iterator();
+ return kmerIterator.next();
} else {
return null;
}
@@ -96,15 +96,15 @@
*/
public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
- posIterator = value.getFFList().iterator();
+ kmerIterator = value.getFFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
- posIterator = value.getFRList().iterator();
+ kmerIterator = value.getFRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
@@ -113,15 +113,15 @@
public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
- posIterator = value.getRFList().iterator();
+ kmerIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RF;
- return posIterator.next();
+ return kmerIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
- posIterator = value.getRRList().iterator();
+ kmerIterator = value.getRRList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RR;
- return posIterator.next();
+ return kmerIterator.next();
} else {
return null;
}
@@ -131,14 +131,14 @@
* head send message to all next nodes
*/
public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -147,14 +147,14 @@
* head send message to all previous nodes
*/
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -171,18 +171,18 @@
* head send message to all previous nodes
*/
public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
+ kmerIterator = value.getRFList().iterator(); // RFList
+ while(kmerIterator.hasNext()){
outgoingMsg.setFlag(AdjMessage.FROMRF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
+ kmerIterator = value.getRRList().iterator(); // RRList
+ while(kmerIterator.hasNext()){
outgoingMsg.setFlag(AdjMessage.FROMRR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -191,18 +191,18 @@
* head send message to all next nodes
*/
public void sendSettledMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
+ kmerIterator = value.getFFList().iterator(); // FFList
+ while(kmerIterator.hasNext()){
outgoingMsg.setFlag(AdjMessage.FROMFF);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
+ kmerIterator = value.getFRList().iterator(); // FRList
+ while(kmerIterator.hasNext()){
outgoingMsg.setFlag(AdjMessage.FROMFR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(posIterator.next());
+ destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
}
@@ -747,44 +747,44 @@
switch(incomingMsg.getFlag() & MessageFlag.DIR_MASK){
case MessageFlag.DIR_FF:
//remove incomingMsg.getSourceId from RR positionList
- posIterator = getVertexValue().getRRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_FR:
//remove incomingMsg.getSourceId from FR positionList
- posIterator = getVertexValue().getFRList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFRList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RF:
//remove incomingMsg.getSourceId from RF positionList
- posIterator = getVertexValue().getRFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getRFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
break;
case MessageFlag.DIR_RR:
//remove incomingMsg.getSourceId from FF positionList
- posIterator = getVertexValue().getFFList().iterator();
- while(posIterator.hasNext()){
- tmpKmer = posIterator.next();
+ kmerIterator = getVertexValue().getFFList().iterator();
+ while(kmerIterator.hasNext()){
+ tmpKmer = kmerIterator.next();
if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
- posIterator.remove();
+ kmerIterator.remove();
break;
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 300d7b0..15dd0d8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -6,6 +6,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
@@ -99,6 +100,8 @@
private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
+ private KmerBytesWritable incomingEdge = null;
+ private KmerBytesWritable outgoingEdge = null;
private KmerListWritable incomingEdgeList = null;
private KmerListWritable outgoingEdgeList = null;
private byte incomingEdgeDir = 0;
@@ -128,6 +131,10 @@
outgoingEdgeList = new KmerListWritable(kmerSize);
if(createdVertexId == null)
createdVertexId = new KmerBytesWritable(kmerSize + 1);
+ if(incomingEdge == null)
+ incomingEdge = new KmerBytesWritable(kmerSize);
+ if(outgoingEdge == null)
+ outgoingEdge = new KmerBytesWritable(kmerSize);
}
/**
@@ -203,26 +210,28 @@
selfReadIdSet.clear();
for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
selfReadIdSet.add(nodeId.getReadId());
- }
- for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
- for(KmerBytesWritable incomingEdge : incomingEdgeList){
- outgoingReadIdSet.clear();
+ }
+ for(KmerBytesWritable incomingEdge : incomingEdgeList){
+ for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
+ outgoingReadIdSet.clear();
incomingReadIdSet.clear();
- outgoingReadIdSet.addAll(kmerMap.get(outgoingEdge));
- incomingReadIdSet.addAll(kmerMap.get(incomingEdge));
+ tmpKmer.set(incomingEdge);
+ incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
+ tmpKmer.set(outgoingEdge);
+ outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
//set all neighberEdge readId intersection
neighborEdgeIntersection.addAll(selfReadIdSet);
- neighborEdgeIntersection.retainAll(outgoingReadIdSet);
neighborEdgeIntersection.retainAll(incomingReadIdSet);
- //set outgoingEdge readId intersection
- outgoingEdgeIntersection.addAll(selfReadIdSet);
- outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
- outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
- //set incomingEdge readId intersection
- incomingEdgeIntersection.addAll(selfReadIdSet);
- incomingEdgeIntersection.retainAll(incomingReadIdSet);
- incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
+ neighborEdgeIntersection.retainAll(outgoingReadIdSet);
+// //set outgoingEdge readId intersection
+// outgoingEdgeIntersection.addAll(selfReadIdSet);
+// outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
+// outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
+// //set incomingEdge readId intersection
+// incomingEdgeIntersection.addAll(selfReadIdSet);
+// incomingEdgeIntersection.retainAll(incomingReadIdSet);
+// incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
if(!neighborEdgeIntersection.isEmpty()){
createdVertex.clear();
@@ -241,34 +250,33 @@
outgoingMsg.setFlag(outgoingEdgeDir);
sendMsg(outgoingEdge, outgoingMsg);
}
-
- if(!incomingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertexSet.add(createdVertex);
-
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- }
-
- if(!outgoingEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
-
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
- }
+// if(!incomingEdgeIntersection.isEmpty()){
+// createdVertex.clear();
+// createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+// createdVertex.setCreatedVertexId(createdVertexId);
+// createdVertex.setIncomingDir(connectedTable[i][1]);
+// createdVertex.setIncomingEdge(incomingEdge);
+// createdVertexSet.add(createdVertex);
+//
+// outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
+// outgoingMsg.setSourceVertexId(getVertexId());
+// outgoingMsg.setFlag(incomingEdgeDir);
+// sendMsg(incomingEdge, outgoingMsg);
+// }
+//
+// if(!outgoingEdgeIntersection.isEmpty()){
+// createdVertex.clear();
+// createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+// createdVertex.setCreatedVertexId(createdVertexId);
+// createdVertex.setOutgoingDir(connectedTable[i][0]);
+// createdVertex.setOutgoingEdge(outgoingEdge);
+// createdVertexSet.add(createdVertex);
+//
+// outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
+// outgoingMsg.setSourceVertexId(getVertexId());
+// outgoingMsg.setFlag(outgoingEdgeDir);
+// sendMsg(outgoingEdge, outgoingMsg);
+// }
}
}
}