PathNodeInitial identifies nodes needing updates
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 54abda3..8955aab 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -27,6 +27,7 @@
*
*/
private static final long serialVersionUID = 1L;
+ public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
private PositionWritable nodeID;
private PositionListWritable forwardForwardList;
private PositionListWritable forwardReverseList;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index 544bd37..ad8ead6 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.util.ToolRunner;
import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeFlag;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -32,28 +33,10 @@
/*
* Flags used when sending messages
*/
- public static class MergeMessageFlag {
- public static final byte EMPTY_MESSAGE = 0;
- public static final byte FROM_SELF = 1;
- public static final byte FROM_SUCCESSOR = 1 << 1;
- public static final byte FROM_PREDECESSOR = 1 << 2;
- public static final byte IS_HEAD = 1 << 3;
- public static final byte IS_TAIL = 1 << 4;
- public static final byte IS_PSEUDOHEAD = 1 << 5;
- public static final byte IS_COMPLETE = 1 << 6;
-
- public static String getFlagAsString(byte code) {
- // TODO: allow multiple flags to be set
- switch (code) {
- case EMPTY_MESSAGE:
- return "EMPTY_MESSAGE";
- case FROM_SELF:
- return "FROM_SELF";
- case FROM_SUCCESSOR:
- return "FROM_SUCCESSOR";
- }
- return "ERROR_BAD_MESSAGE";
- }
+ public static class MergeMessageFlag extends PathNodeFlag {
+ public static final byte FROM_SUCCESSOR = 1 << 5;
+ public static final byte FROM_PREDECESSOR = 1 << 6;
+ public static final byte IS_PSEUDOHEAD = ((byte) 1 << 6); //TODO FIXME
}
/*
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 68b727d..cb3466b 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -45,13 +45,52 @@
/*
* A map-reduce job to find all nodes that are part of a simple path and the mark the nodes that
- * form their heads and tails.
+ * form their heads and tails, also identifies parts of the graph that will participate in a path merge.
+ *
+ * This MR job uses MultipleOutputs rather than remapping the entire graph each iteration:
+ * 1. simple path nodes (indegree = outdegree = 1) (TO_MERGE_OUTPUT collector)
+ * 2. non-path, "complete" nodes, which will not be affected by the path merging (COMPLETE_OUTPUT collector)
+ * 3. non-path, "possibly updated" nodes, whose edges need to be updated after the merge (TO_UPDATE_OUTPUT collector)
*/
@SuppressWarnings("deprecation")
public class PathNodeInitial extends Configured implements Tool {
-
- public static final String COMPLETE_OUTPUT = "complete";
+
+ public static final String COMPLETE_OUTPUT = "complete";
public static final String TO_MERGE_OUTPUT = "toMerge";
+ public static final String TO_UPDATE_OUTPUT = "toUpdate";
+
+ public static class PathNodeFlag {
+ public static final byte EMPTY_MESSAGE = 0;
+ public static final byte FROM_SELF = 1 << 0;
+ public static final byte IS_HEAD = 1 << 1;
+ public static final byte IS_TAIL = 1 << 2;
+ public static final byte IS_COMPLETE = 1 << 3;
+ public static final byte NEAR_PATH = 1 << 4;
+ }
+
+ private static void sendOutputToNextNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+ Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ posIterator = node.getFRList().iterator(); // FRList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ }
+
+ private static void sendOutputToPreviousNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+ Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ posIterator = node.getRFList().iterator(); // RFList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ }
public static class PathNodeInitialMapper extends MapReduceBase implements
Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
@@ -61,171 +100,151 @@
private MessageWritableNodeWithFlag outputValue;
private int inDegree;
private int outDegree;
- private NodeWritable emptyNode;
- private Iterator<PositionWritable> posIterator;
+ private boolean pathNode;
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
outputKey = new PositionWritable();
- emptyNode = new NodeWritable();
}
+ /*
+ * Identify the heads and tails of simple path nodes and their neighbors
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
@Override
public void map(NodeWritable key, NullWritable value,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
throws IOException {
-
inDegree = key.inDegree();
outDegree = key.outDegree();
if (inDegree == 1 && outDegree == 1) {
+ pathNode = true;
+ } else if (inDegree == 0 && outDegree == 1) {
+ pathNode = true;
+ // start of a tip. needs to merge & be marked as head
+ outputValue.set(MergeMessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+ output.collect(key.getNodeID(), outputValue);
+ } else if (inDegree == 1 && outDegree == 0) {
+ pathNode = true;
+ // end of a tip. needs to merge & be marked as tail
+ outputValue.set(MergeMessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+ output.collect(key.getNodeID(), outputValue);
+ } else {
+ pathNode = false;
+ if (outDegree > 0) {
+ // Not a path myself, but my successor might be one. Map forward successor to find heads
+ outputValue.set(MergeMessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+ sendOutputToNextNeighbors(key, outputValue, output);
+ }
+ if (inDegree > 0) {
+ // Not a path myself, but my predecessor might be one. map predecessor to find tails
+ outputValue.set(MergeMessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+ sendOutputToPreviousNeighbors(key, outputValue, output);
+ }
+ // this non-path node won't participate in the merge
+ outputValue.set((byte) (MergeMessageFlag.FROM_SELF | MergeMessageFlag.IS_COMPLETE), key);
+ output.collect(key.getNodeID(), outputValue);
+ }
+
+ if (pathNode) {
// simple path nodes map themselves
outputValue.set(MergeMessageFlag.FROM_SELF, key);
output.collect(key.getNodeID(), outputValue);
reporter.incrCounter("genomix", "path_nodes", 1);
- } else if (inDegree == 0 && outDegree == 1) {
- // start of a tip. needs to merge & be marked as head
- outputValue.set(MergeMessageFlag.FROM_SELF, key);
- output.collect(key.getNodeID(), outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
- outputValue.set(MergeMessageFlag.FROM_PREDECESSOR, emptyNode);
- output.collect(key.getNodeID(), outputValue);
- } else if (inDegree == 1 && outDegree == 0) {
- // end of a tip. needs to merge & be marked as tail
- outputValue.set(MergeMessageFlag.FROM_SELF, key);
- output.collect(key.getNodeID(), outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
-
- outputValue.set(MergeMessageFlag.FROM_SUCCESSOR, emptyNode);
- output.collect(key.getNodeID(), outputValue);
- } else {
- if (outDegree > 0) {
- // Not a path myself, but my successor might be one. Map forward successor to find heads
- outputValue.set(MergeMessageFlag.FROM_PREDECESSOR, emptyNode);
- posIterator = key.getFFList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
- posIterator = key.getFRList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
- }
- if (inDegree > 0) {
- // Not a path myself, but my predecessor might be one. map predecessor to find tails
- outputValue.set(MergeMessageFlag.FROM_SUCCESSOR, emptyNode);
- posIterator = key.getRRList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
- posIterator = key.getRFList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
- }
- // push this non-path node to the "complete" output
- outputValue.set((byte) (MergeMessageFlag.FROM_SELF | MergeMessageFlag.IS_COMPLETE), key);
- output.collect(key.getNodeID(), outputValue);
+ // also mark neighbors of paths (they are candidates for updates)
+ outputValue.set(MergeMessageFlag.NEAR_PATH, NodeWritable.EMPTY_NODE);
+ sendOutputToNextNeighbors(key, outputValue, output);
+ sendOutputToPreviousNeighbors(key, outputValue, output);
}
}
}
public static class PathNodeInitialReducer extends MapReduceBase implements
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
- protected MultipleOutputs mos;
- protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
- protected OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
- protected int KMER_SIZE;
-
+ private MultipleOutputs mos;
+ private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
+ private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
+ private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toUpdateCollector;
+ private int KMER_SIZE;
+
private MessageWritableNodeWithFlag inputValue;
private MessageWritableNodeWithFlag outputValue;
private NodeWritable nodeToKeep;
- private int count;
- private byte flag;
+ private byte outputFlag;
+ private byte inputFlag;
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
- nodeToKeep = new NodeWritable(KMER_SIZE);
+ nodeToKeep = new NodeWritable(KMER_SIZE);
}
- @SuppressWarnings("unchecked")
- @Override
+ /*
+ * Segregate nodes into three bins:
+ * 1. mergeable nodes (marked as H/T)
+ * 2. non-mergeable nodes that are candidates for updates
+ * 3. non-mergeable nodes that are not path neighbors and won't be updated
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
throws IOException {
- completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
- toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
-
- inputValue.set(values.next());
- if (!values.hasNext()) {
- if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
- if ((inputValue.getFlag() & MergeMessageFlag.IS_COMPLETE) > 0) {
- // non-path node. Store in "complete" output
- completeCollector.collect(key, inputValue);
+ completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+ toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+ toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
+
+ outputFlag = MergeMessageFlag.EMPTY_MESSAGE;
+ while (values.hasNext()) {
+ inputValue.set(values.next());
+ inputFlag = inputValue.getFlag();
+ outputFlag |= inputFlag;
+
+ if ((inputFlag & MergeMessageFlag.FROM_SELF) > 0) {
+ // SELF -> keep this node
+ nodeToKeep.set(inputValue.getNode());
+ }
+ }
+
+ if ((outputFlag & MergeMessageFlag.FROM_SELF) > 0) {
+ if ((outputFlag & MergeMessageFlag.IS_COMPLETE) > 0) {
+ if ((outputFlag & MergeMessageFlag.NEAR_PATH) > 0) {
+ // non-path, but update candidate
+ outputValue.set(MergeMessageFlag.NEAR_PATH, nodeToKeep);
+ toUpdateCollector.collect(key, outputValue);
} else {
- // FROM_SELF => need to keep this PATH node
- toMergeCollector.collect(key, inputValue);
+ // non-path node. Store in "complete" output
+ outputValue.set(MergeMessageFlag.EMPTY_MESSAGE, nodeToKeep);
+ completeCollector.collect(key, outputValue);
+ }
+ } else {
+ // path nodes are mergeable
+ outputFlag &= (MergeMessageFlag.IS_HEAD | MergeMessageFlag.IS_TAIL); // clear flags besides H/T
+ outputValue.set(outputFlag, nodeToKeep);
+ toMergeCollector.collect(key, outputValue);
+
+ reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((outputFlag & MergeMessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((outputFlag & MergeMessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
}
}
} else {
- // multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node
- count = 0;
- flag = MergeMessageFlag.EMPTY_MESSAGE;
- while (true) { // process values; break when no more
- count++;
- if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
- // SELF -> keep this node
- flag |= MergeMessageFlag.FROM_SELF;
- nodeToKeep.set(inputValue.getNode());
- if ((inputValue.getFlag() & MergeMessageFlag.IS_COMPLETE) > 0) {
- flag |= MergeMessageFlag.IS_COMPLETE;
- }
- } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_SUCCESSOR) > 0) {
- flag |= MergeMessageFlag.IS_TAIL;
- } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_PREDECESSOR) > 0) {
- flag |= MergeMessageFlag.IS_HEAD;
- }
- if (!values.hasNext()) {
- break;
- } else {
- inputValue.set(values.next());
- }
- }
- if (count < 2) {
- throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
- + String.valueOf(count));
- }
- if ((flag & MergeMessageFlag.FROM_SELF) > 0) {
- if ((flag & MergeMessageFlag.IS_COMPLETE) > 0) {
- // non-path node. Store in "complete" output
- completeCollector.collect(key, inputValue);
- } else {
- // only keep simple path nodes
- outputValue.set(flag, nodeToKeep);
- toMergeCollector.collect(key, outputValue);
-
- reporter.incrCounter("genomix", "path_nodes", 1);
- if ((flag & MergeMessageFlag.IS_HEAD) > 0) {
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
- }
- if ((flag & MergeMessageFlag.IS_TAIL) > 0) {
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
- }
- }
- } else {
- throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + flag);
- }
+ throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
}
}
-
+
public void close() throws IOException {
mos.close();
}
@@ -234,13 +253,14 @@
/*
* Mark the head, tail, and simple path nodes in one map-reduce job.
*/
- public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
+ JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(PathNodeInitial.class);
conf.setJobName("PathNodeInitial " + inputPath);
FileInputFormat.addInputPaths(conf, inputPath);
- Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
+ Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
@@ -253,26 +273,29 @@
conf.setMapperClass(PathNodeInitialMapper.class);
conf.setReducerClass(PathNodeInitialReducer.class);
-
- MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
- MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
- FileSystem dfs = FileSystem.get(conf);
+ MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
+ MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
+ MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, MessageWritableNodeWithFlag.class);
+
+ FileSystem dfs = FileSystem.get(conf);
dfs.delete(outputPath, true); // clean output dir
RunningJob job = JobClient.runJob(conf);
-
+
// move the tmp outputs to the arg-spec'ed dirs
- dfs.rename(new Path(outputPath + File.separator + TO_MERGE_OUTPUT), new Path(toMergeOutput));
- dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
-
+ dfs.rename(new Path(outputPath + File.separator + TO_MERGE_OUTPUT), new Path(toMergeOutput));
+ dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
+ dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+
return job;
}
@Override
public int run(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+ int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
return res;
}