refactor h4 update mapper to use update messages
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 7e6df5e..e6b1575 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
import java.io.File;
@@ -31,18 +46,36 @@
import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
+/*
+ * a probabilistic merge algorithm for merging long single paths (chains without only 1 incoming and outgoing edge)
+ * The merge is guaranteed to succeed, but not all nodes that could be merged in an iteration will be.
+ *
+ * There are two steps to the merge:
+ * 1. (H4UpdatesMapper & H4UpdatesReducer): the direction of the merge is chosen and all
+ * neighbor's edges are updated with the merge intent
+ * 2. H4MergeMapper & H4MergeReducer): the nodes initiating the merge are "sent" to their neighbors, kmers are combined, and edges
+ * are again updated (since the merge-initiator may be neighbor to another merging node).
+ */
@SuppressWarnings("deprecation")
public class MergePathsH4 extends Configured implements Tool {
+ private enum MergeDir {
+ NO_MERGE,
+ FORWARD,
+ BACKWARD
+
+ }
+
/*
- * Mapper class: Partition the graph using random pseudoheads.
- * Heads send themselves to their successors, and all others map themselves.
+ * Mapper class: randomly chooses a direction to merge s.t. if a merge takes place, it will be successful.
+ * Sends update messages to all of this node's neighbors who their new neighbor will be
*/
- public static class MergePathsH4Mapper extends MapReduceBase implements
+ public static class H4UpdatesMapper extends MapReduceBase implements
Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private static long randSeed;
private Random randGenerator;
@@ -51,6 +84,12 @@
private int KMER_SIZE;
private PositionWritable outputKey;
private NodeWithFlagWritable outputValue;
+ private PositionWritable mergeMsgKey;
+ private NodeWithFlagWritable mergeMsgValue;
+ private PositionWritable updateMsgKey;
+ private NodeWithFlagWritable updateMsgValue;
+ private NodeWritable updateMsgNode;
+
private NodeWritable curNode;
private PositionWritable curID;
private PositionWritable nextID;
@@ -60,13 +99,16 @@
private boolean curHead;
private boolean nextHead;
private boolean prevHead;
- private boolean willMerge;
+ private MergeDir mergeDir;
+ private byte inFlag;
private byte headFlag;
private byte tailFlag;
- private byte outFlag;
+ private byte mergeMsgFlag;
+ private byte nextDir;
+ private byte prevDir;
public void configure(JobConf conf) {
-
+
randSeed = conf.getLong("randomSeed", 0);
randGenerator = new Random(randSeed);
probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
@@ -74,6 +116,12 @@
KMER_SIZE = conf.getInt("sizeKmer", 0);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
outputKey = new PositionWritable();
+
+ mergeMsgKey = new PositionWritable();
+ mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+ updateMsgKey = new PositionWritable();
+ updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
curNode = new NodeWritable(KMER_SIZE);
curID = new PositionWritable();
nextID = new PositionWritable();
@@ -91,11 +139,13 @@
*/
protected boolean setNextInfo(NodeWritable node) {
if (node.getFFList().getCountOfPosition() > 0) {
+ nextDir = MessageFlag.DIR_FF;
nextID.set(node.getFFList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
return true;
}
if (node.getFRList().getCountOfPosition() > 0) {
+ nextDir = MessageFlag.DIR_FR;
nextID.set(node.getFRList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
return true;
@@ -108,11 +158,13 @@
*/
protected boolean setPrevInfo(NodeWritable node) {
if (node.getRRList().getCountOfPosition() > 0) {
+ prevDir = MessageFlag.DIR_RR;
prevID.set(node.getRRList().getPosition(0));
prevHead = isNodeRandomHead(prevID);
return true;
}
if (node.getRFList().getCountOfPosition() > 0) {
+ prevDir = MessageFlag.DIR_RF;
prevID.set(node.getRFList().getPosition(0));
prevHead = isNodeRandomHead(prevID);
return true;
@@ -122,47 +174,33 @@
@Override
public void map(PositionWritable key, NodeWithFlagWritable value,
- OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
- throws IOException {
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
- tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
- outFlag = (byte) (headFlag | tailFlag);
-
- // only PATH vertices are present. Find the ID's for my neighbors
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ inFlag = value.getFlag();
curNode.set(value.getNode());
curID.set(curNode.getNodeID());
-
+
+ headFlag = (byte) (MessageFlag.IS_HEAD & inFlag);
+ tailFlag = (byte) (MessageFlag.IS_TAIL & inFlag);
+ mergeMsgFlag = (byte) (headFlag | tailFlag);
+
curHead = isNodeRandomHead(curID);
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
// We prevent merging towards non-path nodes
hasNext = setNextInfo(curNode) && tailFlag == 0;
hasPrev = setPrevInfo(curNode) && headFlag == 0;
- willMerge = false;
-
- // TODO: need to update edges in neighboring nodes
-
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
- // true HEAD met true TAIL. this path is complete
- outFlag |= MessageFlag.MSG_SELF;
- outputValue.set(outFlag, curNode);
- output.collect(curID, outputValue);
- return;
- }
+ mergeDir = MergeDir.NO_MERGE; // no merge to happen
+
+ // decide where we're going to merge to
if (hasNext || hasPrev) {
if (curHead) {
if (hasNext && !nextHead) {
- // compress this head to the forward tail
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(nextID, outputValue);
- willMerge = true;
+ // merge forward
+ mergeMsgFlag |= nextDir;
+ mergeDir = MergeDir.FORWARD;
} else if (hasPrev && !prevHead) {
- // compress this head to the reverse tail
- outFlag |= MessageFlag.FROM_SUCCESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(prevID, outputValue);
- willMerge = true;
+ // merge backwards
+ mergeMsgFlag |= prevDir;
+ mergeDir = MergeDir.BACKWARD;
}
} else {
// I'm a tail
@@ -170,42 +208,80 @@
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(nextID, outputValue);
- willMerge = true;
+ mergeMsgFlag |= nextDir;
+ mergeDir = MergeDir.FORWARD;
}
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(nextID, outputValue);
- willMerge = true;
+ mergeMsgFlag |= nextDir;
+ mergeDir = MergeDir.FORWARD;
}
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
- outFlag |= MessageFlag.FROM_SUCCESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(prevID, outputValue);
- willMerge = true;
+ mergeMsgFlag |= prevDir;
+ mergeDir = MergeDir.BACKWARD;
}
}
}
}
- // if we didn't send ourselves to some other node, remap ourselves for the next round
- if (!willMerge) {
- outFlag |= MessageFlag.MSG_SELF;
- outputValue.set(outFlag, curNode);
- output.collect(curID, outputValue);
+ if (mergeDir == MergeDir.NO_MERGE) {
+ mergeMsgFlag |= MessageFlag.MSG_SELF;
+ mergeMsgValue.set(mergeMsgFlag, curNode);
+ output.collect(curID, mergeMsgValue);
+ } else {
+ // this node will do a merge next round
+ mergeMsgFlag |= MessageFlag.MSG_UPDATE_MERGE;
+ mergeMsgValue.set(mergeMsgFlag, curNode);
+ output.collect(curID, mergeMsgValue);
+
+ sendUpdateToNeighbors(curNode, (byte) (mergeMsgFlag & MessageFlag.DIR_MASK), output);
}
- else {
- // TODO send update to this node's neighbors
- //mos.getCollector(UPDATES_OUTPUT, reporter).collect(key, outputValue);
+ }
+
+ /*
+ * when performing a merge, an update message needs to be sent to my neighbors
+ */
+ private void sendUpdateToNeighbors(NodeWritable node, byte mergeDir,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+ PositionWritable mergeSource = node.getNodeID();
+ PositionWritable mergeTarget = node.getListFromDir(mergeDir).getPosition(0);
+
+ // I need to notify in the opposite direction as I'm merging
+ Iterator<PositionWritable> posIterator1;
+ byte dir1;
+ Iterator<PositionWritable> posIterator2;
+ byte dir2;
+ switch (mergeDir) {
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ // merging forward; tell my previous neighbors
+ posIterator1 = node.getRRList().iterator();
+ dir1 = MessageFlag.DIR_RR;
+ posIterator2 = node.getRFList().iterator();
+ dir2 = MessageFlag.DIR_RF;
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ posIterator1 = node.getFFList().iterator();
+ dir1 = MessageFlag.DIR_FF;
+ posIterator2 = node.getFRList().iterator();
+ dir2 = MessageFlag.DIR_FR;
+ break;
+ default:
+ throw new IOException("Unrecognized direction in sendUpdateToNeighbors: " + mergeDir);
+ }
+ while (posIterator1.hasNext()) {
+ updateMsgValue.setAsUpdateMessage(mergeDir, dir1, mergeSource, mergeTarget);
+ collector.collect(posIterator1.next(), updateMsgValue);
+ }
+ while (posIterator2.hasNext()) {
+ updateMsgValue.setAsUpdateMessage(mergeDir, dir2, mergeSource, mergeTarget);
+ collector.collect(posIterator2.next(), outputValue);
}
}
}
@@ -222,7 +298,7 @@
private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
-
+
private int KMER_SIZE;
private NodeWithFlagWritable inputValue;
private NodeWithFlagWritable outputValue;
@@ -248,19 +324,19 @@
@SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
- OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
- throws IOException {
- toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
- completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
- updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+ completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+ updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
inputValue.set(values.next());
if (!values.hasNext()) {
if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
- if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0
+ && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
// complete path (H & T meet in this node)
completeCollector.collect(key, inputValue);
- } else {
+ } else {
// FROM_SELF => no merging this round. remap self
toMergeCollector.collect(key, inputValue);
}
@@ -314,7 +390,7 @@
curNode.mergeForwardPre(prevNode, KMER_SIZE);
reporter.incrCounter("genomix", "num_merged", 1);
}
-
+
outputValue.set(outFlag, curNode);
if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// True heads meeting tails => merge is complete for this node
@@ -333,7 +409,8 @@
/*
* Run one iteration of the mergePaths algorithm
*/
- public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput, JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput,
+ JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(MergePathsH4.class);
conf.setJobName("MergePathsH4 " + inputPath);
@@ -350,17 +427,17 @@
conf.setOutputKeyClass(PositionWritable.class);
conf.setOutputValueClass(NodeWithFlagWritable.class);
- conf.setMapperClass(MergePathsH4Mapper.class);
+ conf.setMapperClass(H4UpdatesMapper.class);
conf.setReducerClass(MergePathsH4Reducer.class);
-
+
MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
+ PositionWritable.class, NodeWithFlagWritable.class);
MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
+ PositionWritable.class, NodeWithFlagWritable.class);
MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
-
- FileSystem dfs = FileSystem.get(conf);
+ PositionWritable.class, NodeWithFlagWritable.class);
+
+ FileSystem dfs = FileSystem.get(conf);
// clean output dirs
dfs.delete(outputPath, true);
dfs.delete(new Path(toMergeOutput), true);
@@ -368,18 +445,21 @@
dfs.delete(new Path(updatesOutput), true);
RunningJob job = JobClient.runJob(conf);
-
+
// move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
- if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(toMergeOutput))) {
+ if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(
+ toMergeOutput))) {
dfs.mkdirs(new Path(toMergeOutput));
}
- if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(completeOutput))) {
+ if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(
+ completeOutput))) {
dfs.mkdirs(new Path(completeOutput));
}
- if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.UPDATES_OUTPUT), new Path(updatesOutput))) {
+ if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.UPDATES_OUTPUT), new Path(
+ updatesOutput))) {
dfs.mkdirs(new Path(updatesOutput));
}
-
+
return job;
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
index 4009fc6..198c769 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -25,7 +25,7 @@
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.H4UpdatesMapper;
import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -130,7 +130,7 @@
conf.setOutputKeyClass(PositionWritable.class);
conf.setOutputValueClass(NodeWithFlagWritable.class);
- conf.setMapperClass(MergePathsH4Mapper.class);
+ conf.setMapperClass(H4UpdatesMapper.class);
conf.setReducerClass(MergePathsH4Reducer.class);
FileSystem.get(conf).delete(new Path(outputPath), true);