Add dirMapper to mapKey in P1
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index e873856..55eed63 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -73,8 +73,8 @@
}
}
- public ArrayList<Byte> mapKeyByInternalKmer(Iterator<M> msgIterator){
- ArrayList<Byte> kmerDir = new ArrayList<Byte>();
+ public void mapKeyByInternalKmer(Iterator<M> msgIterator){
+// ArrayList<Byte> kmerDir = new ArrayList<Byte>();
while(msgIterator.hasNext()){
incomingMsg = msgIterator.next();
String kmerString = incomingMsg.getInternalKmer().toString();
@@ -84,11 +84,11 @@
VKmerBytesWritable kmer = new VKmerBytesWritable();
kmerList = new VKmerListWritable();
if(reverseKmer.compareTo(tmpKmer) > 0){
- kmerDir.add(KmerDir.FORWARD);
+// kmerDir.add(KmerDir.FORWARD);
kmer.setAsCopy(tmpKmer);
}
else{
- kmerDir.add(KmerDir.REVERSE);
+// kmerDir.add(KmerDir.REVERSE);
kmer.setAsCopy(reverseKmer);
}
if(!kmerMapper.containsKey(kmer)){
@@ -101,7 +101,6 @@
kmerMapper.put(kmer, kmerList);
}
}
- return kmerDir;
}
public void reduceKeyByInternalKmer(){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 29465a4..306f124 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -1,6 +1,7 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import edu.uci.ics.genomix.type.EdgeWritable;
@@ -11,6 +12,7 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.io.message.PathMergeMessageWritable;
import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex.KmerDir;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
/**
@@ -22,6 +24,8 @@
MapReduceVertex<VertexValueWritable, PathMergeMessageWritable> {
private ArrayList<PathMergeMessageWritable> receivedMsg = new ArrayList<PathMergeMessageWritable>();
+ private HashMap<VKmerBytesWritable, ArrayList<Byte>> dirMapper = new HashMap<VKmerBytesWritable, ArrayList<Byte>>();
+ private ArrayList<Byte> dirList = new ArrayList<Byte>();
private EdgeWritable tmpEdge = new EdgeWritable();
/**
@@ -73,17 +77,66 @@
*/
public void aggregateMsgAndGroupInFakeNode(Iterator<PathMergeMessageWritable> msgIterator){
kmerMapper.clear();
+ dirMapper.clear();
/** Mapper **/
- ArrayList<Byte> kmerDir = mapKeyByInternalKmer(msgIterator);
- boolean isFlip = kmerDir.get(0) == kmerDir.get(1) ? false : true;
+ mapKeyByInternalKmer(msgIterator);
+// boolean isFlip = kmerDir.get(0) == kmerDir.get(1) ? false : true;
/** Reducer **/
- reduceKeyByInternalKmer(isFlip);
+ reduceKeyByInternalKmer();
+ }
+
+ /**
+ * typical for P1
+ * @return
+ */
+ @Override
+ public void mapKeyByInternalKmer(Iterator<PathMergeMessageWritable> msgIterator){
+ byte dir = 0;
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ String kmerString = incomingMsg.getInternalKmer().toString();
+ tmpKmer.setByRead(kmerString.length(), kmerString.getBytes(), 0);
+ reverseKmer.setByReadReverse(kmerString.length(), kmerString.getBytes(), 0);
+
+ VKmerBytesWritable kmer = new VKmerBytesWritable();
+ kmerList = new VKmerListWritable();
+ if(reverseKmer.compareTo(tmpKmer) > 0){
+ dir = KmerDir.FORWARD;
+ kmer.setAsCopy(tmpKmer);
+ }
+ else{
+ dir = KmerDir.REVERSE;
+ kmer.setAsCopy(reverseKmer);
+ }
+ if(!kmerMapper.containsKey(kmer)){
+ //kmerList.reset();
+ kmerList.append(incomingMsg.getSourceVertexId());
+ kmerMapper.put(kmer, kmerList);
+ } else{
+ kmerList.setCopy(kmerMapper.get(kmer));
+ kmerList.append(incomingMsg.getSourceVertexId());
+ kmerMapper.put(kmer, kmerList);
+ }
+ //dirMapper
+ if(!dirMapper.containsKey(kmer)){
+ dirList.clear();
+ dirList.add(dir);
+ dirMapper.put(kmer, dirList);
+ } else{
+ dirList.clear();
+ dirList.addAll(dirMapper.get(kmer));
+ dirList.add(dir);
+ dirMapper.put(kmer, dirList);
+ }
+ }
}
/**
* typical for P1
*/
- public void reduceKeyByInternalKmer(boolean isFlip){
+ @Override
+ public void reduceKeyByInternalKmer(){
+ int i = 0;
for (VKmerBytesWritable key : kmerMapper.keySet()) {
kmerList = kmerMapper.get(key);
//always delete kmerList(1), keep kmerList(0)
@@ -91,9 +144,11 @@
//send kill message to kmerList(1), and carry with kmerList(0) to update edgeLists of kmerList(1)'s neighbor
outgoingMsg.setFlag(MessageFlag.KILL);
outgoingMsg.getNode().setInternalKmer(kmerList.getPosition(0));
+ boolean isFlip = dirMapper.get(key).get(0) == dirMapper.get(key).get(1) ? false : true;
outgoingMsg.setFlip(isFlip);
destVertexId.setAsCopy(kmerList.getPosition(1));
sendMsg(destVertexId, outgoingMsg);
+ i = i + 2;
}
}
@@ -194,8 +249,9 @@
voteToHalt();
} else if(receivedMsg.size() == 1){
boolean isHead = isHeadNode();
+ boolean isDead = isDeadNode();
processMerge(receivedMsg.get(0));
- if(isHead){
+ if(isHead || isDead){
// NON-FAKE and Final vertice send msg to FAKE vertex
sendMsgToFakeVertex();
//final vertex