Merge branch 'anbangx/fullstack_genomix' into genomix/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 17f1a11..7d1738e 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.io.WritableComparator;
import edu.uci.ics.genomix.data.KmerUtil;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
/**
* Variable kmer length byteswritable
@@ -503,6 +504,25 @@
appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
clearLeadBit();
}
+
+ public void mergeWithKmerInDir(byte dir, int initialKmerSize, KmerBytesWritable kmer) {
+ switch(dir & DirectionFlag.DIR_MASK) {
+ case DirectionFlag.DIR_FF:
+ mergeWithFFKmer(initialKmerSize, kmer);
+ break;
+ case DirectionFlag.DIR_FR:
+ mergeWithFRKmer(initialKmerSize, kmer);
+ break;
+ case DirectionFlag.DIR_RF:
+ mergeWithRFKmer(initialKmerSize, kmer);
+ break;
+ case DirectionFlag.DIR_RR:
+ mergeWithRRKmer(initialKmerSize, kmer);
+ break;
+ default:
+ throw new RuntimeException("Direciotn not recognized: " + dir);
+ }
+ }
public static void appendOneByteAtPosition(int k, byte onebyte, byte[] buffer, int start, int length) {
int position = start + length - 1 - k / 4;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 1fef016..a5ba50d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -290,11 +290,15 @@
private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private PositionWritable outPosn;
- private ArrayList<NodeWithFlagWritable> updateMsgs;
private boolean sawCurNode;
private byte outFlag;
private byte inFlag;
+ // to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
+ private ArrayList<NodeWithFlagWritable> updateMsgs;
+ private int updateMsgsSize;
+ private int updateMsgsCount;
+
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new NodeWithFlagWritable(KMER_SIZE);
@@ -302,6 +306,7 @@
curNode = new NodeWritable(KMER_SIZE);
outPosn = new PositionWritable();
updateMsgs = new ArrayList<NodeWithFlagWritable>();
+ updateMsgsSize = updateMsgs.size();
}
/*
@@ -315,14 +320,14 @@
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
sawCurNode = false;
- updateMsgs.clear();
-
+ updateMsgsCount = 0;
+
byte inDir;
while (values.hasNext()) {
inputValue.set(values.next());
inFlag = inputValue.getFlag();
inDir = (byte) (inFlag & MessageFlag.MSG_MASK);
-
+
switch (inDir) {
case MessageFlag.MSG_UPDATE_MERGE:
case MessageFlag.MSG_SELF:
@@ -334,13 +339,13 @@
sawCurNode = true;
if (inDir == MessageFlag.MSG_SELF) {
outPosn.set(curNode.getNodeID());
- } else { // MSG_UPDATE_MERGE
+ } else { // MSG_UPDATE_MERGE
// merge messages are sent to their merge recipient
outPosn.set(curNode.getListFromDir(inDir).getPosition(0));
}
break;
case MessageFlag.MSG_UPDATE_EDGE:
- updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+ addUpdateMessage(inputValue);
break;
default:
throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
@@ -349,15 +354,23 @@
// process all the update messages for this node
// I have no idea how to make this more efficient...
- for (NodeWithFlagWritable updateMsg : updateMsgs) {
- NodeWithFlagWritable.processUpdates(curNode, updateMsg, KMER_SIZE);
+ for (int i=0; i < updateMsgsCount; i++) {
+ NodeWithFlagWritable.processUpdates(curNode, updateMsgs.get(i), KMER_SIZE);
}
outputValue.set(outFlag, curNode);
output.collect(outPosn, outputValue);
}
+
+ private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
+ updateMsgsCount++;
+ if (updateMsgsCount >= updateMsgsSize) {
+ updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+ } else {
+ updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
+ }
+ }
}
-
-
+
/*
* Mapper class: sends the update messages to their (already decided) destination
*/
@@ -413,15 +426,11 @@
inFlag = value.getFlag();
curNode.set(value.getNode());
curID.set(curNode.getNodeID());
-
+
}
}
-
-
-
-
/*
* Reducer class: processes the update messages from updateMapper
*/
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index a7e8157..6729c78 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -38,7 +38,8 @@
public static final byte EXTRA_FLAG = 1 << 6;
}
- public void setAsUpdateMessage(byte mergeDir, byte neighborDir, PositionWritable nodeToDelete, PositionWritable nodeToAdd) {
+ public void setAsUpdateMessage(byte mergeDir, byte neighborDir, PositionWritable nodeToDelete,
+ PositionWritable nodeToAdd) {
byte neighborToMeDir = mirrorDirection(neighborDir);
byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
@@ -53,12 +54,10 @@
node.getListFromDir(neighborToMergeDir).append(nodeToAdd);
}
-
-
/*
* Returns the edge dir for B->A when the A->B edge is type @dir
*/
- public byte mirrorDirection(byte dir) {
+ public static byte mirrorDirection(byte dir) {
switch (dir) {
case MessageFlag.DIR_FF:
return MessageFlag.DIR_RR;
@@ -77,7 +76,7 @@
* When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge,
* returns the new edge type for A->C
*/
- public byte flipDirection(byte neighborDir, byte mergeDir) {
+ public static byte flipDirection(byte neighborDir, byte mergeDir) {
switch (mergeDir) {
case MessageFlag.DIR_FF:
@@ -118,22 +117,7 @@
// remove position and merge its position lists with node
if (!updateNode.equals(NodeWritable.EMPTY_NODE)) {
// need to remove updateNode from the specified PositionList
- switch (updateFlag & MessageFlag.DIR_MASK) {
- case MessageFlag.DIR_FF:
- node.getFFList().remove(updateNode.getNodeID());
- break;
- case MessageFlag.DIR_FR:
- node.getFRList().remove(updateNode.getNodeID());
- break;
- case MessageFlag.DIR_RF:
- node.getRFList().remove(updateNode.getNodeID());
- break;
- case MessageFlag.DIR_RR:
- node.getRRList().remove(updateNode.getNodeID());
- break;
- default:
- throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
- }
+ node.getListFromDir(updateFlag).remove(updateNode.getNodeID());
}
// now merge positionlists from update and node
node.getFFList().appendList(updateNode.getFFList());
@@ -142,25 +126,40 @@
node.getRRList().appendList(updateNode.getRRList());
} else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
// this message wants to merge node with updateNode.
- // the direction flag indicates how the merge should take place.
- // TODO send update or remove edge that I merge with
+ // the direction flag indicates node's relationship with updateNode
+ node.getListFromDir(updateFlag).remove(updateNode.getNodeID()); // remove the node from my edges
+ node.getKmer().mergeWithKmerInDir(updateFlag, kmerSize, updateNode.getKmer()); // merge with its kmer
+
+ // merge my edges with the incoming node's edges, accounting for if the node flipped in
+ // the merge and if it's my predecessor or successor
switch (updateFlag & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
- node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+ // node merging with me is FF to me
node.getFFList().set(updateNode.getFFList());
- // TODO not just FF list here-- FR as well
+ node.getFRList().set(updateNode.getFRList());
+ // update isn't allowed to have any other successors (mirror & flip)
+ if (updateNode.getRFList().getCountOfPosition() > 0)
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_FR:
- // FIXME not sure if this should be reverse-complement or just reverse...
- node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
- node.getFRList().set(updateNode.getFRList());
+ // flip edges
+ node.getFFList().set(updateNode.getRFList());
+ node.getFRList().set(updateNode.getRRList());
+ if (updateNode.getFFList().getCountOfPosition() > 0)
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_RF:
-
+ // flip edges
+ node.getRFList().set(updateNode.getFFList());
+ node.getRRList().set(updateNode.getFRList());
+ if (updateNode.getRRList().getCountOfPosition() > 0)
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_RR:
- node.getKmer().mergeWithRRKmer(kmerSize, updateNode.getKmer());
+ node.getRFList().set(updateNode.getRFList());
node.getRRList().set(updateNode.getRRList());
+ if (updateNode.getFRList().getCountOfPosition() > 0)
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
default:
throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);