WIP H4 1 mapper + 2 reducers
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 9675fc0..2c64139 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -117,7 +117,7 @@
}
public PositionListWritable getListFromDir(byte dir) {
- switch (dir) {
+ switch (dir & DirectionFlag.DIR_MASK) {
case DirectionFlag.DIR_FF:
return getFFList();
case DirectionFlag.DIR_FR:
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index e6b1575..1fef016 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
@@ -82,13 +83,9 @@
private float probBeingRandomHead;
private int KMER_SIZE;
- private PositionWritable outputKey;
private NodeWithFlagWritable outputValue;
- private PositionWritable mergeMsgKey;
private NodeWithFlagWritable mergeMsgValue;
- private PositionWritable updateMsgKey;
private NodeWithFlagWritable updateMsgValue;
- private NodeWritable updateMsgNode;
private NodeWritable curNode;
private PositionWritable curID;
@@ -115,11 +112,8 @@
KMER_SIZE = conf.getInt("sizeKmer", 0);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
- outputKey = new PositionWritable();
- mergeMsgKey = new PositionWritable();
mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
- updateMsgKey = new PositionWritable();
updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
@@ -287,9 +281,151 @@
}
/*
- * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
+ * Reducer class: processes the update messages from updateMapper
*/
- private static class MergePathsH4Reducer extends MapReduceBase implements
+ private static class H4UpdatesReducer extends MapReduceBase implements
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+ private int KMER_SIZE;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
+ private NodeWritable curNode;
+ private PositionWritable outPosn;
+ private ArrayList<NodeWithFlagWritable> updateMsgs;
+ private boolean sawCurNode;
+ private byte outFlag;
+ private byte inFlag;
+
+ public void configure(JobConf conf) {
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ inputValue = new NodeWithFlagWritable(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
+ curNode = new NodeWritable(KMER_SIZE);
+ outPosn = new PositionWritable();
+ updateMsgs = new ArrayList<NodeWithFlagWritable>();
+ }
+
+ /*
+ * Process updates from mapper
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ sawCurNode = false;
+ updateMsgs.clear();
+
+ byte inDir;
+ while (values.hasNext()) {
+ inputValue.set(values.next());
+ inFlag = inputValue.getFlag();
+ inDir = (byte) (inFlag & MessageFlag.MSG_MASK);
+
+ switch (inDir) {
+ case MessageFlag.MSG_UPDATE_MERGE:
+ case MessageFlag.MSG_SELF:
+ if (sawCurNode)
+ throw new IOException("Saw more than one MSG_SELF! previously seen self: " + curNode
+ + " current self: " + inputValue.getNode());
+ curNode.set(inputValue.getNode());
+ outFlag = inFlag;
+ sawCurNode = true;
+ if (inDir == MessageFlag.MSG_SELF) {
+ outPosn.set(curNode.getNodeID());
+ } else { // MSG_UPDATE_MERGE
+ // merge messages are sent to their merge recipient
+ outPosn.set(curNode.getListFromDir(inDir).getPosition(0));
+ }
+ break;
+ case MessageFlag.MSG_UPDATE_EDGE:
+ updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+ break;
+ default:
+ throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
+ }
+ }
+
+ // process all the update messages for this node
+ // I have no idea how to make this more efficient...
+ for (NodeWithFlagWritable updateMsg : updateMsgs) {
+ NodeWithFlagWritable.processUpdates(curNode, updateMsg, KMER_SIZE);
+ }
+ outputValue.set(outFlag, curNode);
+ output.collect(outPosn, outputValue);
+ }
+ }
+
+
+ /*
+ * Mapper class: sends the update messages to their (already decided) destination
+ */
+ public static class H4MergeMapper extends MapReduceBase implements
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+ private static long randSeed;
+ private Random randGenerator;
+ private float probBeingRandomHead;
+
+ private int KMER_SIZE;
+ private NodeWithFlagWritable outputValue;
+ private NodeWithFlagWritable mergeMsgValue;
+ private NodeWithFlagWritable updateMsgValue;
+
+ private NodeWritable curNode;
+ private PositionWritable curID;
+ private PositionWritable nextID;
+ private PositionWritable prevID;
+ private boolean hasNext;
+ private boolean hasPrev;
+ private boolean curHead;
+ private boolean nextHead;
+ private boolean prevHead;
+ private MergeDir mergeDir;
+ private byte inFlag;
+ private byte headFlag;
+ private byte tailFlag;
+ private byte mergeMsgFlag;
+ private byte nextDir;
+ private byte prevDir;
+
+ public void configure(JobConf conf) {
+
+ randSeed = conf.getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
+
+ mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+ updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
+ curNode = new NodeWritable(KMER_SIZE);
+ curID = new PositionWritable();
+ nextID = new PositionWritable();
+ prevID = new PositionWritable();
+ }
+
+ @Override
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ inFlag = value.getFlag();
+ curNode.set(value.getNode());
+ curID.set(curNode.getNodeID());
+
+ }
+
+ }
+
+
+
+
+
+ /*
+ * Reducer class: processes the update messages from updateMapper
+ */
+ private static class H4MergeReducer2 extends MapReduceBase implements
Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
private static final String TO_MERGE_OUTPUT = "toMerge";
@@ -428,13 +564,13 @@
conf.setOutputValueClass(NodeWithFlagWritable.class);
conf.setMapperClass(H4UpdatesMapper.class);
- conf.setReducerClass(MergePathsH4Reducer.class);
+ conf.setReducerClass(H4UpdatesReducer.class);
- MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
PositionWritable.class, NodeWithFlagWritable.class);
- MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
PositionWritable.class, NodeWithFlagWritable.class);
- MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
PositionWritable.class, NodeWithFlagWritable.class);
FileSystem dfs = FileSystem.get(conf);
@@ -447,16 +583,16 @@
RunningJob job = JobClient.runJob(conf);
// move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
- if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.TO_MERGE_OUTPUT), new Path(
+ if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.TO_MERGE_OUTPUT), new Path(
toMergeOutput))) {
dfs.mkdirs(new Path(toMergeOutput));
}
- if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.COMPLETE_OUTPUT), new Path(
+ if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.COMPLETE_OUTPUT), new Path(
completeOutput))) {
dfs.mkdirs(new Path(completeOutput));
}
- if (!dfs.rename(new Path(outputPath + File.separator + MergePathsH4Reducer.UPDATES_OUTPUT), new Path(
- updatesOutput))) {
+ if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.UPDATES_OUTPUT),
+ new Path(updatesOutput))) {
dfs.mkdirs(new Path(updatesOutput));
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 304cf0c..a7e8157 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -143,10 +143,12 @@
} else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
// this message wants to merge node with updateNode.
// the direction flag indicates how the merge should take place.
+ // TODO send update or remove edge that I merge with
switch (updateFlag & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
node.getFFList().set(updateNode.getFFList());
+ // TODO not just FF list here-- FR as well
break;
case MessageFlag.DIR_FR:
// FIXME not sure if this should be reverse-complement or just reverse...
@@ -185,6 +187,10 @@
set(flag, node);
}
+ public NodeWithFlagWritable(NodeWithFlagWritable other) {
+ this(other.flag, other.node);
+ }
+
public void set(NodeWithFlagWritable right) {
set(right.getFlag(), right.getNode());
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 0523812..3c46dc7 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -61,7 +61,7 @@
private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
- private static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+ public static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
while (posIterator.hasNext()) {
@@ -73,7 +73,7 @@
}
}
- private static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+ public static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
while (posIterator.hasNext()) {
@@ -179,7 +179,7 @@
/*
* Segregate nodes into three bins:
- * 1. mergeable nodes (marked as H/T)
+ * 1. mergeable nodes (maybe marked H or T)
* 2. non-mergeable nodes that are candidates for updates
* 3. non-mergeable nodes that are not path neighbors and won't be updated
*