path node works for complex starts
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 063c5f7..2ced8dd 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -59,6 +60,7 @@
private byte outFlag;
public void configure(JobConf conf) {
+
randSeed = conf.getLong("randomSeed", 0);
randGenerator = new Random(randSeed);
probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
@@ -116,24 +118,35 @@
public void map(PositionWritable key, MessageWritableNodeWithFlag value,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
throws IOException {
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+ tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
+ outFlag = (byte) (headFlag | tailFlag);
+
// only PATH vertices are present. Find the ID's for my neighbors
curNode.set(value.getNode());
curID.set(curNode.getNodeID());
+
curHead = isNodeRandomHead(curID);
- hasNext = setNextInfo(curNode);
- hasPrev = setPrevInfo(curNode);
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(curNode) && tailFlag == 0;
+ hasPrev = setPrevInfo(curNode) && headFlag == 0;
willMerge = false;
reporter.setStatus("CHECK ME OUT");
- System.out.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
+ System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
// TODO: need to update edges in neighboring nodes
-
+
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ // true HEAD met true TAIL. this path is complete
+ outFlag |= MessageFlag.FROM_SELF;
+ outputValue.set(outFlag, curNode);
+ output.collect(curID, outputValue);
+ return;
+ }
if (hasNext || hasPrev) {
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
- tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
- outFlag = (byte) (headFlag | tailFlag);
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
@@ -181,7 +194,7 @@
}
}
- // if we didn't send ourselves to some other node, remap ourselves
+ // if we didn't send ourselves to some other node, remap ourselves for the next round
if (!willMerge) {
outFlag |= MessageFlag.FROM_SELF;
outputValue.set(outFlag, curNode);
@@ -195,7 +208,9 @@
*/
private static class MergePathsH4Reducer extends MapReduceBase implements
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
+ private MultipleOutputs mos;
+ public static final String COMPLETE_OUTPUT = "complete";
+
private int KMER_SIZE;
private MessageWritableNodeWithFlag inputValue;
private MessageWritableNodeWithFlag outputValue;
@@ -209,13 +224,16 @@
private byte outFlag;
public void configure(JobConf conf) {
+ mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
+ inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
prevNode = new NodeWritable(KMER_SIZE);
nextNode = new NodeWritable(KMER_SIZE);
}
+ @SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
@@ -223,10 +241,14 @@
inputValue.set(values.next());
if (!values.hasNext()) {
- // all single nodes must be remapped
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
- // FROM_SELF => remap self
- output.collect(key, inputValue);
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+ // complete path (H & T meet in this node)
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ } else {
+ // FROM_SELF => no merging this round. remap self
+ output.collect(key, inputValue);
+ }
} else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
// FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
throw new IOException("Only one value recieved in merge, but it wasn't from self!");
@@ -246,10 +268,12 @@
sawPrevNode = true;
} else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
nextNode.set(inputValue.getNode());
- sawNextNode = false;
- } else {
+ sawNextNode = true;
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
curNode.set(inputValue.getNode());
sawCurNode = true;
+ } else {
+ throw new IOException("Unknown origin for merging node");
}
if (!values.hasNext()) {
break;
@@ -279,7 +303,8 @@
outputValue.set(outFlag, curNode);
if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// True heads meeting tails => merge is complete for this node
- // TODO: send to the "complete" collector
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
+ // TODO send update to this node's neighbors
} else {
output.collect(key, outputValue);
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index e7bcdf6..c8386ea 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -57,6 +57,7 @@
private int inDegree;
private int outDegree;
private NodeWritable emptyNode;
+ private Iterator<PositionWritable> posIterator;
public PathNodeInitialMapper() {
@@ -80,26 +81,51 @@
outputValue.set(MessageFlag.FROM_SELF, key);
output.collect(key.getNodeID(), outputValue);
reporter.incrCounter("genomix", "path_nodes", 1);
- } else if (outDegree == 1) {
- // Not a path myself, but my successor might be one. Map forward successor
+ } else if (inDegree == 0 && outDegree == 1) {
+ // start of a tip. needs to merge & be marked as head
+ outputValue.set(MessageFlag.FROM_SELF, key);
+ output.collect(key.getNodeID(), outputValue);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+
outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
- if (key.getFFList().getCountOfPosition() > 0) {
- outputKey.set(key.getFFList().getPosition(0));
- } else {
- outputKey.set(key.getFRList().getPosition(0));
- }
- output.collect(outputKey, outputValue);
- } else if (inDegree == 1) {
- // Not a path myself, but my predecessor might be one.
+ output.collect(key.getNodeID(), outputValue);
+ } else if (inDegree == 1 && outDegree == 0) {
+ // end of a tip. needs to merge & be marked as tail
+ outputValue.set(MessageFlag.FROM_SELF, key);
+ output.collect(key.getNodeID(), outputValue);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+
outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
- if (key.getRRList().getCountOfPosition() > 0) {
- outputKey.set(key.getRRList().getPosition(0));
- } else {
- outputKey.set(key.getRFList().getPosition(0));
- }
- output.collect(outputKey, outputValue);
+ output.collect(key.getNodeID(), outputValue);
} else {
- // TODO: all other nodes will not participate-- should they be collected in a "complete" output?
+ if (outDegree > 0) {
+ // Not a path myself, but my successor might be one. Map forward successor to find heads
+ outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
+ posIterator = key.getFFList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ posIterator = key.getFRList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ }
+ if (inDegree > 0) {
+ // Not a path myself, but my predecessor might be one. map predecessor to find tails
+ outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
+ posIterator = key.getRRList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ posIterator = key.getRFList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ }
}
}
}
@@ -128,7 +154,7 @@
inputValue.set(values.next());
if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
// FROM_SELF => need to keep this PATH node
output.collect(key, inputValue);
}
@@ -138,15 +164,14 @@
flag = MessageFlag.EMPTY_MESSAGE;
while (true) { // process values; break when no more
count++;
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
// SELF -> keep this node
+ flag |= MessageFlag.FROM_SELF;
nodeToKeep.set(inputValue.getNode());
- } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) == MessageFlag.FROM_SUCCESSOR) {
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
flag |= MessageFlag.IS_TAIL;
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
- } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
flag |= MessageFlag.IS_HEAD;
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
}
if (!values.hasNext()) {
break;
@@ -158,11 +183,21 @@
throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
+ String.valueOf(count));
}
- if ((flag & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
- // only map simple path nodes
+ if ((flag & MessageFlag.FROM_SELF) > 0) {
+ // only keep simple path nodes
outputValue.set(flag, nodeToKeep);
output.collect(key, outputValue);
+
reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((flag & MessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((flag & MessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ }
+ } else {
+ // this is a non-path node.
+ // TODO: keep this node in a "completed" reducer
}
}
}