add merge and update utility functions
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 9189a67..17f1a11 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -384,6 +384,8 @@
* e.g. AAGCTAA merge with GGTTGTT, if the initial kmerSize = 3
* then it will return AAGCTAACAACC
*
+ * A merge B => A B~
+ *
* @param initialKmerSize
* : the initial kmerSize
* @param kmer
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 8aab267..9675fc0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -28,6 +28,16 @@
*/
private static final long serialVersionUID = 1L;
public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
+
+ // merge/update directions
+ public static class DirectionFlag {
+ public static final byte DIR_FF = 0b00 << 0;
+ public static final byte DIR_FR = 0b01 << 0;
+ public static final byte DIR_RF = 0b10 << 0;
+ public static final byte DIR_RR = 0b11 << 0;
+ public static final byte DIR_MASK = 0b11 << 0;
+ }
+
private PositionWritable nodeID;
private PositionListWritable forwardForwardList;
private PositionListWritable forwardReverseList;
@@ -58,9 +68,9 @@
reverseReverseList.set(RRList);
kmer.set(kmer);
}
-
+
public void set(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
- PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer){
+ PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
this.nodeID.set(nodeID);
this.forwardForwardList.set(FFList);
this.forwardReverseList.set(FRList);
@@ -106,6 +116,21 @@
return reverseReverseList;
}
+ public PositionListWritable getListFromDir(byte dir) {
+ switch (dir) {
+ case DirectionFlag.DIR_FF:
+ return getFFList();
+ case DirectionFlag.DIR_FR:
+ return getFRList();
+ case DirectionFlag.DIR_RF:
+ return getRFList();
+ case DirectionFlag.DIR_RR:
+ return getRRList();
+ default:
+ throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
+ }
+ }
+
public PositionWritable getNodeID() {
return nodeID;
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index f929151..304cf0c 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -12,6 +12,8 @@
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
/*
@@ -21,31 +23,94 @@
public class NodeWithFlagWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
private byte flag;
private NodeWritable node;
-
- public static class MessageFlag {
+
+ public static class MessageFlag extends DirectionFlag {
public static final byte EMPTY_MESSAGE = 0;
// message types
- public static final byte MSG_SELF = 0b01 << 0;
- public static final byte MSG_UPDATE_MERGE = 0b10 << 0;
- public static final byte MSG_UPDATE_EDGE = 0b11 << 0;
- public static final byte MSG_MASK = 0b11 << 0;
- // merge/update directions
- public static final byte DIR_FF = 0b00 << 2;
- public static final byte DIR_FR = 0b01 << 2;
- public static final byte DIR_RF = 0b10 << 2;
- public static final byte DIR_RR = 0b11 << 2;
- public static final byte DIR_MASK = 0b11 << 2;
+ public static final byte MSG_SELF = 0b01 << 2;
+ public static final byte MSG_UPDATE_MERGE = 0b10 << 2;
+ public static final byte MSG_UPDATE_EDGE = 0b11 << 2;
+ public static final byte MSG_MASK = 0b11 << 2;
// additional info
public static final byte IS_HEAD = 0b1 << 4;
public static final byte IS_TAIL = 0b1 << 5;
// extra bit used differently in each operation
public static final byte EXTRA_FLAG = 1 << 6;
}
-
+
+ public void setAsUpdateMessage(byte mergeDir, byte neighborDir, PositionWritable nodeToDelete, PositionWritable nodeToAdd) {
+ byte neighborToMeDir = mirrorDirection(neighborDir);
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
+
+ // clear previous kmer and edge data
+ node.reset(0);
+
+ // indicate the node to delete
+ setFlag((byte) (MessageFlag.MSG_UPDATE_EDGE | neighborToMeDir));
+ node.getNodeID().set(nodeToDelete);
+
+ // add the new node to the appropriate list
+ node.getListFromDir(neighborToMergeDir).append(nodeToAdd);
+ }
+
+
+
+ /*
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /*
+ * When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge,
+ * returns the new edge type for A->C
+ */
+ public byte flipDirection(byte neighborDir, byte mergeDir) {
+ switch (mergeDir) {
+
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_RR:
+ // no change since the merging node didn't flip
+ return neighborDir;
+
+ case MessageFlag.DIR_FR:
+ case MessageFlag.DIR_RF:
+ // merging node is flipping; my edge type must also flip
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+
+ default:
+ throw new RuntimeException("Unrecognized direction for mergeDir: " + mergeDir);
+ }
+ }
+
/*
* Process any changes to @node contained in @updateMsg. This includes merges and edge updates
*/
- public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
+ public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize)
+ throws IOException {
byte updateFlag = updateMsg.getFlag();
NodeWritable updateNode = updateMsg.getNode();
if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
@@ -53,7 +118,7 @@
// remove position and merge its position lists with node
if (!updateNode.equals(NodeWritable.EMPTY_NODE)) {
// need to remove updateNode from the specified PositionList
- switch(updateFlag & MessageFlag.DIR_MASK) {
+ switch (updateFlag & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
node.getFFList().remove(updateNode.getNodeID());
break;
@@ -78,7 +143,7 @@
} else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
// this message wants to merge node with updateNode.
// the direction flag indicates how the merge should take place.
- switch(updateFlag & MessageFlag.DIR_MASK) {
+ switch (updateFlag & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
node.getFFList().set(updateNode.getFFList());
@@ -89,7 +154,7 @@
node.getFRList().set(updateNode.getFRList());
break;
case MessageFlag.DIR_RF:
-
+
break;
case MessageFlag.DIR_RR:
node.getKmer().mergeWithRRKmer(kmerSize, updateNode.getKmer());
@@ -114,7 +179,7 @@
this.flag = flag;
this.node = new NodeWritable(kmerSize);
}
-
+
public NodeWithFlagWritable(byte flag, NodeWritable node) {
this(node.getKmer().getKmerLength());
set(flag, node);
@@ -152,6 +217,10 @@
return this.flag;
}
+ public void setFlag(byte flag) {
+ this.flag = flag;
+ }
+
public String toString() {
return node.toString() + '\t' + String.valueOf(flag);
}
@@ -171,7 +240,7 @@
@Override
public int hashCode() {
-// return super.hashCode() + flag + node.hashCode();
+ // return super.hashCode() + flag + node.hashCode();
return flag + node.hashCode();
}