SimpleSplitRepeat is completed. An issue in graph building(hadoop): KmerList.appendList() doesn't check existed element
diff --git a/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
new file mode 100644
index 0000000..f2e3942
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
@@ -0,0 +1,3 @@
+1 AATAG
+2 GCATA
+3 ATAGC
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index 1633c26..687474b 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -36,7 +36,7 @@
while (values.hasNext()) {
tmpNode.set(values.next());
outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
- outputNode.getFFList().appendList(tmpNode.getFFList());
+ outputNode.getFFList().appendList(tmpNode.getFFList()); //appendList need to check if insert node exists
outputNode.getFRList().appendList(tmpNode.getFRList());
outputNode.getRFList().appendList(tmpNode.getRFList());
outputNode.getRRList().appendList(tmpNode.getRRList());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 127ab3e..ca7f67d 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,7 +22,7 @@
private JobConf conf = new JobConf();
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
+ private static final String DATA_PATH = "data/webmap/AdjSplitRepeat.txt";
private static final String HDFS_PATH = "/webmap";
private static final String RESULT_PATH = "/result";
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
new file mode 100755
index 0000000..a983577
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
new file mode 100755
index 0000000..a187c64
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 15dd0d8..3d87c60 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -6,8 +6,10 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.TreeMap;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
@@ -16,100 +18,64 @@
import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
public class SplitRepeatVertex extends
BasicGraphCleanVertex{
- public class CreatedVertex{
- KmerBytesWritable createdVertexId;
- String incomingDir;
- String outgoingDir;
- KmerBytesWritable incomingEdge;
- KmerBytesWritable outgoingEdge;
+ public class EdgeDir{
+ public static final byte DIR_FF = 0 << 0;
+ public static final byte DIR_FR = 1 << 0;
+ public static final byte DIR_RF = 2 << 0;
+ public static final byte DIR_RR = 3 << 0;
+ }
+
+ public class DeletedEdge{
+ private byte dir;
+ private KmerBytesWritable edge;
- public CreatedVertex(){
- createdVertexId = new KmerBytesWritable(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge = new KmerBytesWritable(kmerSize);
- outgoingEdge = new KmerBytesWritable(kmerSize);
- }
-
- public void clear(){
- createdVertexId.reset(kmerSize);
- incomingDir = "";
- outgoingDir = "";
- incomingEdge.reset(kmerSize);
- outgoingEdge.reset(kmerSize);
- }
-
- public KmerBytesWritable getCreatedVertexId() {
- return createdVertexId;
+ public DeletedEdge(){
+ dir = 0;
+ edge = new KmerBytesWritable(kmerSize);
}
- public void setCreatedVertexId(KmerBytesWritable createdVertexId) {
- this.createdVertexId = createdVertexId;
+ public byte getDir() {
+ return dir;
}
- public String getIncomingDir() {
- return incomingDir;
+ public void setDir(byte dir) {
+ this.dir = dir;
}
- public void setIncomingDir(String incomingDir) {
- this.incomingDir = incomingDir;
+ public KmerBytesWritable getEdge() {
+ return edge;
}
- public String getOutgoingDir() {
- return outgoingDir;
- }
-
- public void setOutgoingDir(String outgoingDir) {
- this.outgoingDir = outgoingDir;
- }
-
- public KmerBytesWritable getIncomingEdge() {
- return incomingEdge;
- }
-
- public void setIncomingEdge(KmerBytesWritable incomingEdge) {
- this.incomingEdge.set(incomingEdge);
- }
-
- public KmerBytesWritable getOutgoingEdge() {
- return outgoingEdge;
- }
-
- public void setOutgoingEdge(KmerBytesWritable outgoingEdge) {
- this.outgoingEdge.set(outgoingEdge);
+ public void setEdge(KmerBytesWritable edge) {
+ this.edge.set(edge);
}
}
- private String[][] connectedTable = new String[][]{
- {"FF", "RF"},
- {"FF", "RR"},
- {"FR", "RF"},
- {"FR", "RR"}
+ private byte[][] connectedTable = new byte[][]{
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RF, EdgeDir.DIR_FR},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FF},
+ {EdgeDir.DIR_RR, EdgeDir.DIR_FR}
};
public static Set<String> existKmerString = new HashSet<String>();
private Set<Long> readIdSet;
private Set<Long> incomingReadIdSet = new HashSet<Long>();
private Set<Long> outgoingReadIdSet = new HashSet<Long>();
private Set<Long> selfReadIdSet = new HashSet<Long>();
- private Set<Long> incomingEdgeIntersection = new HashSet<Long>();
- private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
- private KmerBytesWritable incomingEdge = null;
- private KmerBytesWritable outgoingEdge = null;
private KmerListWritable incomingEdgeList = null;
private KmerListWritable outgoingEdgeList = null;
private byte incomingEdgeDir = 0;
private byte outgoingEdgeDir = 0;
protected KmerBytesWritable createdVertexId = null;
- private CreatedVertex createdVertex = new CreatedVertex();
- public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
/**
* initiate kmerSize, maxIteration
@@ -130,11 +96,7 @@
if(outgoingEdgeList == null)
outgoingEdgeList = new KmerListWritable(kmerSize);
if(createdVertexId == null)
- createdVertexId = new KmerBytesWritable(kmerSize + 1);
- if(incomingEdge == null)
- incomingEdge = new KmerBytesWritable(kmerSize);
- if(outgoingEdge == null)
- outgoingEdge = new KmerBytesWritable(kmerSize);
+ createdVertexId = new KmerBytesWritable(kmerSize);//kmerSize + 1
}
/**
@@ -156,7 +118,184 @@
return sb.toString();
}
+ /**
+ * GenerateString only for test
+ */
+ public String generateString(){
+ if(existKmerString.isEmpty()){
+ existKmerString.add("AAA");
+ return "AAA";
+ }
+ else
+ return "GGG";
+ }
+
+ public void generateKmerMap(Iterator<MessageWritable> msgIterator){
+ kmerMap.clear();
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ readIdSet = new HashSet<Long>();
+ for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
+ readIdSet.add(nodeId.getReadId());
+ }
+ kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
+ }
+ }
+
+ public void setSelfReadIdSet(){
+ selfReadIdSet.clear();
+ for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
+ selfReadIdSet.add(nodeId.getReadId());
+ }
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
+ public void createNewVertex(int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
+ //add the corresponding edge to new vertex
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ vertexValue.getRFList().append(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ vertexValue.getRRList().append(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ vertexValue.getFFList().append(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ vertexValue.getFRList().append(outgoingEdge);
+ break;
+ }
+ vertexId.set(createdVertexId);
+ vertex.setVertexId(vertexId);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(vertexId, vertex);
+ }
+
+ public void sendMsgToUpdateEdge(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ outgoingMsg.setCreatedVertexId(createdVertexId);
+ outgoingMsg.setSourceVertexId(getVertexId());
+
+ outgoingMsg.setFlag(incomingEdgeDir);
+ destVertexId.set(incomingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+
+ outgoingMsg.setFlag(outgoingEdgeDir);
+ destVertexId.set(outgoingEdge);
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ public void storeDeletedEdge(Set<DeletedEdge> deletedEdges, int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ DeletedEdge deletedIncomingEdge = new DeletedEdge();
+ DeletedEdge deletedOutgoingEdge = new DeletedEdge();
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RF);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ case EdgeDir.DIR_RR:
+ deletedIncomingEdge.setDir(EdgeDir.DIR_RR);
+ deletedIncomingEdge.setEdge(incomingEdge);
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FF);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ case EdgeDir.DIR_FR:
+ deletedOutgoingEdge.setDir(EdgeDir.DIR_FR);
+ deletedOutgoingEdge.setEdge(outgoingEdge);
+ break;
+ }
+ deletedEdges.add(deletedIncomingEdge);
+ deletedEdges.add(deletedOutgoingEdge);
+ }
+ public void deleteEdgeFromOldVertex(DeletedEdge deleteEdge){
+ switch(deleteEdge.dir){
+ case EdgeDir.DIR_RF:
+ getVertexValue().getRFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_RR:
+ getVertexValue().getRRList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FF:
+ getVertexValue().getFFList().remove(deleteEdge.getEdge());
+ break;
+ case EdgeDir.DIR_FR:
+ getVertexValue().getFRList().remove(deleteEdge.getEdge());
+ break;
+ }
+ }
+
+ public void setEdgeListAndEdgeDir(int i){
+ switch(connectedTable[i][0]){
+ case EdgeDir.DIR_RF:
+ incomingEdgeList.set(getVertexValue().getRFList());
+ incomingEdgeDir = MessageFlag.DIR_RF;
+ break;
+ case EdgeDir.DIR_RR:
+ incomingEdgeList.set(getVertexValue().getRRList());
+ incomingEdgeDir = MessageFlag.DIR_RR;
+ break;
+ }
+ switch(connectedTable[i][1]){
+ case EdgeDir.DIR_FF:
+ outgoingEdgeList.set(getVertexValue().getFFList());
+ outgoingEdgeDir = MessageFlag.DIR_FF;
+ break;
+ case EdgeDir.DIR_FR:
+ outgoingEdgeList.set(getVertexValue().getFRList());
+ outgoingEdgeDir = MessageFlag.DIR_FR;
+ break;
+ }
+ }
+
+ public void setNeighborEdgeIntersection(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+ outgoingReadIdSet.clear();
+ incomingReadIdSet.clear();
+ tmpKmer.set(incomingEdge);
+ incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
+ tmpKmer.set(outgoingEdge);
+ outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
+
+ //set all neighberEdge readId intersection
+ neighborEdgeIntersection.addAll(selfReadIdSet);
+ neighborEdgeIntersection.retainAll(incomingReadIdSet);
+ neighborEdgeIntersection.retainAll(outgoingReadIdSet);
+ }
+
+ public void updateEdgeListPointToNewVertex(){
+ byte meToNeighborDir = incomingMsg.getFlag();
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_FR:
+ getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RF:
+ getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+ break;
+ case MessageFlag.DIR_RR:
+ getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+ break;
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -175,162 +314,103 @@
}
voteToHalt();
} else if(getSuperstep() == 3){
- kmerMap.clear();
- createdVertexSet.clear();
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- readIdSet = new HashSet<Long>();
- for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
- readIdSet.add(nodeId.getReadId());
- }
- kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
- }
+ /** generate KmerMap map kmer(key) to readIdSet(value) **/
+ generateKmerMap(msgIterator);
+
+ /** set self readId set **/
+ setSelfReadIdSet();
+
+ int count = 0;
+ //A set storing deleted edges
+ Set<DeletedEdge> deletedEdges = new HashSet<DeletedEdge>();
/** process connectedTable **/
for(int i = 0; i < 4; i++){
- switch(connectedTable[i][0]){
- case "FF":
- outgoingEdgeList.set(getVertexValue().getFFList());
- outgoingEdgeDir = MessageFlag.DIR_FF;
- break;
- case "FR":
- outgoingEdgeList.set(getVertexValue().getFRList());
- outgoingEdgeDir = MessageFlag.DIR_FR;
- break;
- }
- switch(connectedTable[i][1]){
- case "RF":
- incomingEdgeList.set(getVertexValue().getRFList());
- incomingEdgeDir = MessageFlag.DIR_RF;
- break;
- case "RR":
- incomingEdgeList.set(getVertexValue().getRRList());
- incomingEdgeDir = MessageFlag.DIR_RR;
- break;
- }
- selfReadIdSet.clear();
- for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
- selfReadIdSet.add(nodeId.getReadId());
- }
- for(KmerBytesWritable incomingEdge : incomingEdgeList){
- for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
- outgoingReadIdSet.clear();
- incomingReadIdSet.clear();
- tmpKmer.set(incomingEdge);
- incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
- tmpKmer.set(outgoingEdge);
- outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
-
- //set all neighberEdge readId intersection
- neighborEdgeIntersection.addAll(selfReadIdSet);
- neighborEdgeIntersection.retainAll(incomingReadIdSet);
- neighborEdgeIntersection.retainAll(outgoingReadIdSet);
-// //set outgoingEdge readId intersection
-// outgoingEdgeIntersection.addAll(selfReadIdSet);
-// outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
-// outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
-// //set incomingEdge readId intersection
-// incomingEdgeIntersection.addAll(selfReadIdSet);
-// incomingEdgeIntersection.retainAll(incomingReadIdSet);
-// incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
+ /** set edgeList and edgeDir based on connectedTable **/
+ setEdgeListAndEdgeDir(i);
+
+ KmerBytesWritable incomingEdge = new KmerBytesWritable(kmerSize);
+ KmerBytesWritable outgoingEdge = new KmerBytesWritable(kmerSize);
+ for(int x = 0; x < incomingEdgeList.getCountOfPosition(); x++){
+ for(int y = 0; y < outgoingEdgeList.getCountOfPosition(); y++){
+ incomingEdge.set(incomingEdgeList.getPosition(x));
+ outgoingEdge.set(outgoingEdgeList.getPosition(y));
+ /** set neighborEdge readId intersection **/
+ setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
if(!neighborEdgeIntersection.isEmpty()){
- createdVertex.clear();
- createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
- createdVertex.setCreatedVertexId(createdVertexId);
- createdVertex.setIncomingDir(connectedTable[i][1]);
- createdVertex.setOutgoingDir(connectedTable[i][0]);
- createdVertex.setIncomingEdge(incomingEdge);
- createdVertex.setOutgoingEdge(outgoingEdge);
- createdVertexSet.add(createdVertex);
+ if(count == 0)
+ createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+ else
+ createdVertexId.setByRead("GGG".getBytes(), 0);
+ count++;
- outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlag(incomingEdgeDir);
- sendMsg(incomingEdge, outgoingMsg);
- outgoingMsg.setFlag(outgoingEdgeDir);
- sendMsg(outgoingEdge, outgoingMsg);
+ /** create new/created vertex **/
+ createNewVertex(i, incomingEdge, outgoingEdge);
+
+ /** send msg to neighbors to update their edges to new vertex **/
+ sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
+
+ /** store deleted edge **/
+ storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
}
-// if(!incomingEdgeIntersection.isEmpty()){
-// createdVertex.clear();
-// createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
-// createdVertex.setCreatedVertexId(createdVertexId);
-// createdVertex.setIncomingDir(connectedTable[i][1]);
-// createdVertex.setIncomingEdge(incomingEdge);
-// createdVertexSet.add(createdVertex);
-//
-// outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
-// outgoingMsg.setSourceVertexId(getVertexId());
-// outgoingMsg.setFlag(incomingEdgeDir);
-// sendMsg(incomingEdge, outgoingMsg);
-// }
-//
-// if(!outgoingEdgeIntersection.isEmpty()){
-// createdVertex.clear();
-// createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
-// createdVertex.setCreatedVertexId(createdVertexId);
-// createdVertex.setOutgoingDir(connectedTable[i][0]);
-// createdVertex.setOutgoingEdge(outgoingEdge);
-// createdVertexSet.add(createdVertex);
-//
-// outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
-// outgoingMsg.setSourceVertexId(getVertexId());
-// outgoingMsg.setFlag(outgoingEdgeDir);
-// sendMsg(outgoingEdge, outgoingMsg);
-// }
}
}
+
+// for(KmerBytesWritable incomingEdge : incomingEdgeList){
+// for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
+// /** set neighborEdge readId intersection **/
+// setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
+//
+// if(!neighborEdgeIntersection.isEmpty()){
+// if(count == 0)
+// createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+// else
+// createdVertexId.setByRead("GGG".getBytes(), 0);
+// count++;
+//
+// /** create new/created vertex **/
+// createNewVertex(i, incomingEdge, outgoingEdge);
+//
+// /** send msg to neighbors to update their edges to new vertex **/
+// sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
+//
+// /** store deleted edge **/
+// storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
+// }
+// }
+// }
}
+ /** delete extra edges from old vertex **/
+ for(DeletedEdge deletedEdge : deletedEdges){
+ deleteEdgeFromOldVertex(deletedEdge);
+ }
+
+ /** Old vertex delete or voteToHalt **/
+ if(getVertexValue().getDegree() == 0)//if no any edge, delete
+ deleteVertex(getVertexId());
+ else
+ voteToHalt();
} else if(getSuperstep() == 4){
while(msgIterator.hasNext()){
incomingMsg = msgIterator.next();
/** update edgelist to new/created vertex **/
- byte meToNeighborDir = incomingMsg.getFlag();
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- switch(neighborToMeDir){
- case MessageFlag.DIR_FF:
- getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_FR:
- getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RF:
- getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
- break;
- case MessageFlag.DIR_RR:
- getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
- getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
- break;
- }
- /** add new/created vertex **/
- for(CreatedVertex v : createdVertexSet){
- Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
- VertexValueWritable vertexValue = new VertexValueWritable();
- switch(v.incomingDir){
- case "RF":
- vertexValue.getRFList().append(v.incomingEdge);
- break;
- case "RR":
- vertexValue.getRRList().append(v.incomingEdge);
- break;
- }
- switch(v.outgoingDir){
- case "FF":
- vertexValue.getFFList().append(v.outgoingEdge);
- break;
- case "FR":
- vertexValue.getFRList().append(v.outgoingEdge);
- break;
- }
- vertex.setVertexId(v.getCreatedVertexId());
- vertex.setVertexValue(vertexValue);
- }
- createdVertexSet.clear();
+ updateEdgeListPointToNewVertex();
}
+ voteToHalt();
}
}
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(SplitRepeatVertex.class.getSimpleName());
+ job.setVertexClass(SplitRepeatVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
index 21ffe34..1948acd 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
@@ -45,7 +45,7 @@
//P4ForMergeGraph/bin/read
public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "SimpleTest"};
+ + "AdjSplitRepeat"};
private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
@@ -59,7 +59,7 @@
private MiniDFSCluster dfsCluster;
private JobConf conf = new JobConf();
- private int numberOfNC = 2;
+ private int numberOfNC = 1;
public void setUp() throws Exception {
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);