add getSingleNeighbor and refactor bubbleMerge to use it
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 6bd0cca..fb0d17b 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -27,7 +27,6 @@
import java.util.Map;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.data.Marshal;
@@ -251,14 +250,11 @@
}
}
- /**
- * check if need filp
- */
public EDGETYPE flipNeighbor(){
return flipNeighbor(this);
}
- public static EDGETYPE flipNeighbor(EDGETYPE neighborToMe){ // TODO use NodeWritable
+ public static EDGETYPE flipNeighbor(EDGETYPE neighborToMe){
switch (neighborToMe) {
case FF:
return FR;
@@ -273,6 +269,18 @@
}
}
}
+
+ public static class NeighborInfo {
+ public final EDGETYPE et;
+ public final EdgeWritable edge;
+ public final VKmerBytesWritable kmer;
+
+ public NeighborInfo(EDGETYPE edgeType, EdgeWritable edgeWritable) {
+ et = edgeType;
+ edge = edgeWritable;
+ kmer = edge.getKey();
+ }
+ }
private static final long serialVersionUID = 1L;
public static final NodeWritable EMPTY_NODE = new NodeWritable();
@@ -375,6 +383,21 @@
throw new IllegalStateException("Programmer error: we shouldn't get here... Degree is 1 in " + direction + " but didn't find a an edge list > 1");
}
+ /**
+ * Get this node's single neighbor in the given direction. Return null if there are multiple or no neighbors.
+ */
+ public NeighborInfo getSingleNeighbor(DIR direction){
+ if(getDegree(direction) != 1) {
+ return null;
+ }
+ for(EDGETYPE et : direction.edgeType()) {
+ if(getEdgeList(et).size() > 0) {
+ return new NeighborInfo(et, getEdgeList(et).get(0));
+ }
+ }
+ return null;
+ }
+
public EdgeListWritable getEdgeList(EDGETYPE edgeType) {
return edges[edgeType.get() & EDGETYPE.MASK];
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/SimpleBubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/SimpleBubbleMergeVertex.java
index 7614606..bfc43ca 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/SimpleBubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/SimpleBubbleMergeVertex.java
@@ -11,6 +11,7 @@
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.NodeWritable.DIR;
import edu.uci.ics.genomix.type.NodeWritable.EDGETYPE;
+import edu.uci.ics.genomix.type.NodeWritable.NeighborInfo;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.config.GenomixJobConf;
import edu.uci.ics.genomix.pregelix.client.Client;
@@ -65,32 +66,26 @@
public void sendBubbleAndMajorVertexMsgToMinorVertex(){
VertexValueWritable vertex = getVertexValue();
- //TODO make function that returns a single neighbor as <EDGETYPE, EDGE> (jake)
- EDGETYPE reverseEdgeType = vertex.getNeighborEdgeType(DIR.REVERSE);
- EdgeWritable reverseEdge = vertex.getEdgeList(reverseEdgeType).get(0);
-
- EDGETYPE forwardEdgeType = vertex.getNeighborEdgeType(DIR.FORWARD);
- EdgeWritable forwardEdge = vertex.getEdgeList(forwardEdgeType).get(0);
-
- VKmerBytesWritable reverseKmer = reverseEdge.getKey();
- VKmerBytesWritable forwardKmer = forwardEdge.getKey();
-
+ NeighborInfo reverseNeighbor = vertex.getSingleNeighbor(DIR.REVERSE);
+ NeighborInfo forwardNeighbor = vertex.getSingleNeighbor(DIR.REVERSE);
+
//TODO combine into only one byte, change internally/under the hood
// get majorVertex and minorVertex and meToMajorEdgeType and meToMinorEdgeType
- if(forwardKmer == reverseKmer)
- throw new IllegalArgumentException("majorVertexId is equal to minorVertexId, this is not allowd!");
- boolean forwardIsMajor = forwardKmer.compareTo(reverseKmer) >= 0;
- VKmerBytesWritable minorVertexId = null;
+ if(forwardNeighbor.kmer == reverseNeighbor.kmer) // FIXME should this *really* be == or .equals?
+ throw new IllegalArgumentException("majorVertexId is equal to minorVertexId, this is not allowed!");
+
+ VKmerBytesWritable minorVertexId;
+ boolean forwardIsMajor = forwardNeighbor.kmer.compareTo(reverseNeighbor.kmer) >= 0;
if (forwardIsMajor) {
- outgoingMsg.setMajorVertexId(forwardEdge.getKey());
- outgoingMsg.setMajorToBubbleEdgetype(forwardEdgeType.mirror());
- outgoingMsg.setMinorToBubbleEdgetype(reverseEdgeType.mirror());
- minorVertexId = reverseKmer;
+ outgoingMsg.setMajorVertexId(forwardNeighbor.kmer);
+ outgoingMsg.setMajorToBubbleEdgetype(forwardNeighbor.et.mirror());
+ outgoingMsg.setMinorToBubbleEdgetype(reverseNeighbor.et.mirror());
+ minorVertexId = reverseNeighbor.kmer;
} else {
- outgoingMsg.setMajorVertexId(reverseEdge.getKey());
- outgoingMsg.setMajorToBubbleEdgetype(reverseEdgeType.mirror());
- outgoingMsg.setMinorToBubbleEdgetype(forwardEdgeType.mirror());
- minorVertexId = forwardKmer;
+ outgoingMsg.setMajorVertexId(reverseNeighbor.kmer);
+ outgoingMsg.setMajorToBubbleEdgetype(reverseNeighbor.et.mirror());
+ outgoingMsg.setMinorToBubbleEdgetype(forwardNeighbor.et.mirror());
+ minorVertexId = forwardNeighbor.kmer;
}
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setNode(getVertexValue().getNode());