test cases
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 291839c..6870ff1 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -254,4 +254,8 @@
return inDegree() == 1 && outDegree() == 1;
}
+ public boolean isSimpleOrTerminalPath() {
+ return isPathNode() || (inDegree() == 0 && outDegree() == 1) || (inDegree() == 1 && outDegree() == 0);
+ }
+
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index ae921c4..3d93baa 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
@@ -37,18 +38,14 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
-import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
-import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -90,8 +87,8 @@
private PositionWritable curID;
private PositionWritable nextID;
private PositionWritable prevID;
- private boolean hasNext;
- private boolean hasPrev;
+ private boolean mergeableNext;
+ private boolean mergeablePrev;
private boolean curHead;
private boolean nextHead;
private boolean prevHead;
@@ -124,6 +121,11 @@
protected boolean isNodeRandomHead(PositionWritable nodeID) {
// "deterministically random", based on node id
randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+
+ // similar hashcodes will produce similar initial random values. Burn through a few to increase spread
+ for (int i = 0; i < 100; i++) {
+ randGenerator.nextFloat();
+ }
return randGenerator.nextFloat() < probBeingRandomHead;
}
@@ -171,7 +173,7 @@
inFlag = value.getFlag();
curNode.set(value.getNode());
curID.set(curNode.getNodeID());
-
+ mergeDir = MergeDir.NO_MERGE; // no merge to happen
headFlag = (byte) (MessageFlag.IS_HEAD & inFlag);
tailFlag = (byte) (MessageFlag.IS_TAIL & inFlag);
mergeMsgFlag = (byte) (headFlag | tailFlag);
@@ -179,41 +181,41 @@
curHead = isNodeRandomHead(curID);
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
// We prevent merging towards non-path nodes
- hasNext = setNextInfo(curNode) && tailFlag == 0;
- hasPrev = setPrevInfo(curNode) && headFlag == 0;
- mergeDir = MergeDir.NO_MERGE; // no merge to happen
+ boolean isPath = curNode.isSimpleOrTerminalPath();
+ mergeableNext = setNextInfo(curNode) && tailFlag == 0;
+ mergeablePrev = setPrevInfo(curNode) && headFlag == 0;
// decide where we're going to merge to
- if (hasNext || hasPrev) {
+ if (isPath && (mergeableNext || mergeablePrev)) {
if (curHead) {
- if (hasNext && !nextHead) {
+ if (mergeableNext && !nextHead) {
// merge forward
mergeMsgFlag |= nextDir;
mergeDir = MergeDir.FORWARD;
- } else if (hasPrev && !prevHead) {
+ } else if (mergeablePrev && !prevHead) {
// merge backwards
mergeMsgFlag |= prevDir;
mergeDir = MergeDir.BACKWARD;
}
} else {
// I'm a tail
- if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ if (mergeableNext && mergeablePrev) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) > 0 && curID.compareTo(prevID) > 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
mergeMsgFlag |= nextDir;
mergeDir = MergeDir.FORWARD;
}
- } else if (!hasPrev) {
+ } else if (!mergeablePrev) {
// no previous node
- if (!nextHead && curID.compareTo(nextID) < 0) {
+ if (!nextHead && curID.compareTo(nextID) > 0) {
// merge towards tail in forward dir
mergeMsgFlag |= nextDir;
mergeDir = MergeDir.FORWARD;
}
- } else if (!hasNext) {
+ } else if (!mergeableNext) {
// no next node
- if (!prevHead && curID.compareTo(prevID) < 0) {
+ if (!prevHead && curID.compareTo(prevID) > 0) {
// merge towards tail in reverse dir
mergeMsgFlag |= prevDir;
mergeDir = MergeDir.BACKWARD;
@@ -287,10 +289,8 @@
private int KMER_SIZE;
private NodeWithFlagWritable inputValue;
private NodeWithFlagWritable outputValue;
- private NodeWritable curNode;
private PositionWritable outPosn;
private boolean sawCurNode;
- private byte outFlag;
private byte inFlag;
// to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
@@ -302,19 +302,26 @@
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new NodeWithFlagWritable(KMER_SIZE);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
- curNode = new NodeWritable(KMER_SIZE);
outPosn = new PositionWritable();
updateMsgs = new ArrayList<NodeWithFlagWritable>();
updateMsgsSize = updateMsgs.size();
}
+ private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
+ updateMsgsCount++;
+ if (updateMsgsCount >= updateMsgsSize) {
+ updateMsgs.add(new NodeWithFlagWritable(myInputValue)); // make a copy of inputValue-- not a reference!
+ } else {
+ updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
+ }
+ }
+
/*
* Process updates from mapper
*
* (non-Javadoc)
* @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
- @SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
@@ -326,22 +333,23 @@
inputValue.set(values.next());
inFlag = inputValue.getFlag();
inMsg = (byte) (inFlag & MessageFlag.MSG_MASK);
-
+
switch (inMsg) {
case MessageFlag.MSG_UPDATE_MERGE:
case MessageFlag.MSG_SELF:
if (sawCurNode)
- throw new IOException("Saw more than one MSG_SELF! previously seen self: " + curNode
- + " current self: " + inputValue.getNode());
- curNode.set(inputValue.getNode());
- outFlag = inFlag;
- sawCurNode = true;
+ throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+ + outputValue.getNode() + " current self: " + inputValue.getNode());
if (inMsg == MessageFlag.MSG_SELF) {
- outPosn.set(curNode.getNodeID());
- } else { // MSG_UPDATE_MERGE
+ outPosn.set(outputValue.getNode().getNodeID());
+ } else if (inMsg == MessageFlag.MSG_UPDATE_MERGE) {
// merge messages are sent to their merge recipient
- outPosn.set(curNode.getListFromDir(inMsg).getPosition(0));
+ outPosn.set(outputValue.getNode().getListFromDir(inMsg).getPosition(0));
+ } else {
+ throw new IOException("Unrecongized MessageFlag MSG: " + inMsg);
}
+ outputValue.set(inFlag, inputValue.getNode());
+ sawCurNode = true;
break;
case MessageFlag.MSG_UPDATE_EDGE:
addUpdateMessage(inputValue);
@@ -355,196 +363,111 @@
}
// process all the update messages for this node
- // I have no idea how to make this more efficient...
- for (int i=0; i < updateMsgsCount; i++) {
- NodeWithFlagWritable.processUpdates(curNode, updateMsgs.get(i), KMER_SIZE);
+ for (int i = 0; i < updateMsgsCount; i++) {
+ outputValue.processUpdates(updateMsgs.get(i), KMER_SIZE);
}
- outputValue.set(outFlag, curNode);
output.collect(outPosn, outputValue);
}
-
- private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
- updateMsgsCount++;
- if (updateMsgsCount >= updateMsgsSize) {
- updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
- } else {
- updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
- }
- }
}
/*
- * Mapper class: sends the update messages to their (already decided) destination
+ * Reducer class: processes merge messages
*/
- public static class H4MergeMapper extends MapReduceBase implements
- Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
- private static long randSeed;
- private Random randGenerator;
- private float probBeingRandomHead;
-
- private int KMER_SIZE;
- private NodeWithFlagWritable outputValue;
- private NodeWithFlagWritable mergeMsgValue;
- private NodeWithFlagWritable updateMsgValue;
-
- private NodeWritable curNode;
- private PositionWritable curID;
- private PositionWritable nextID;
- private PositionWritable prevID;
- private boolean hasNext;
- private boolean hasPrev;
- private boolean curHead;
- private boolean nextHead;
- private boolean prevHead;
- private MergeDir mergeDir;
- private byte inFlag;
- private byte headFlag;
- private byte tailFlag;
- private byte mergeMsgFlag;
- private byte nextDir;
- private byte prevDir;
-
- public void configure(JobConf conf) {
-
- randSeed = conf.getLong("randomSeed", 0);
- randGenerator = new Random(randSeed);
- probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
-
- KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new NodeWithFlagWritable(KMER_SIZE);
-
- mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
- updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
-
- curNode = new NodeWritable(KMER_SIZE);
- curID = new PositionWritable();
- nextID = new PositionWritable();
- prevID = new PositionWritable();
- }
-
- @Override
- public void map(PositionWritable key, NodeWithFlagWritable value,
- OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
- inFlag = value.getFlag();
- curNode.set(value.getNode());
- curID.set(curNode.getNodeID());
-
- }
-
- }
-
- /*
- * Reducer class: processes the update messages from updateMapper
- */
- private static class H4MergeReducer2 extends MapReduceBase implements
+ private static class H4MergeReducer extends MapReduceBase implements
Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
- private static final String TO_MERGE_OUTPUT = "toMerge";
- private static final String COMPLETE_OUTPUT = "complete";
- private static final String UPDATES_OUTPUT = "update";
- private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
- private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
- private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
+ public static final String TO_UPDATE_OUTPUT = "toUpdate";
+ public static final String COMPLETE_OUTPUT = "complete";
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
+ private OutputCollector<NodeWritable, NullWritable> completeCollector;
private int KMER_SIZE;
private NodeWithFlagWritable inputValue;
private NodeWithFlagWritable outputValue;
- private NodeWritable curNode;
- private NodeWritable prevNode;
- private NodeWritable nextNode;
+ private PositionWritable outputKey;
private boolean sawCurNode;
- private boolean sawPrevNode;
- private boolean sawNextNode;
- private int count;
- private byte outFlag;
+ private byte inFlag;
+
+ // to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
+ private ArrayList<NodeWithFlagWritable> mergeMsgs;
+ private int updateMsgsSize;
+ private int mergeMsgsCount;
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new NodeWithFlagWritable(KMER_SIZE);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
- curNode = new NodeWritable(KMER_SIZE);
- prevNode = new NodeWritable(KMER_SIZE);
- nextNode = new NodeWritable(KMER_SIZE);
+ outputKey = new PositionWritable();
+ mergeMsgs = new ArrayList<NodeWithFlagWritable>();
+ updateMsgsSize = mergeMsgs.size();
}
+ private void addMergeMessage(NodeWithFlagWritable myInputValue) {
+ mergeMsgsCount++;
+ if (mergeMsgsCount >= updateMsgsSize) {
+ mergeMsgs.add(new NodeWithFlagWritable(myInputValue)); // make a copy of inputValue-- not a reference!
+ } else {
+ mergeMsgs.get(mergeMsgsCount - 1).set(myInputValue); // update existing reference
+ }
+ }
+
+ /*
+ * Process merges
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
@SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
- OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
- toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+ OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
+ throws IOException {
+ toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
- updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
+ sawCurNode = false;
+ mergeMsgsCount = 0;
- inputValue.set(values.next());
- if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
- if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0
- && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
- // complete path (H & T meet in this node)
- completeCollector.collect(key, inputValue);
- } else {
- // FROM_SELF => no merging this round. remap self
- toMergeCollector.collect(key, inputValue);
- }
- } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
- // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
- throw new IOException("Only one value recieved in merge, but it wasn't from self!");
- }
- } else {
- // multiple inputs => a merge will take place. Aggregate all, then collect the merged path
- count = 0;
- outFlag = MessageFlag.EMPTY_MESSAGE;
- sawCurNode = false;
- sawPrevNode = false;
- sawNextNode = false;
- while (true) { // process values; break when no more
- count++;
- outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
- if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
- prevNode.set(inputValue.getNode());
- sawPrevNode = true;
- } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
- nextNode.set(inputValue.getNode());
- sawNextNode = true;
- } else if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
- curNode.set(inputValue.getNode());
+ while (values.hasNext()) {
+ inputValue.set(values.next());
+ inFlag = inputValue.getFlag();
+ switch (inFlag & MessageFlag.MSG_MASK) {
+ case MessageFlag.MSG_SELF:
+ if (sawCurNode)
+ throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+ + outputValue.getNode() + " current self: " + inputValue.getNode());
+ outputKey.set(outputValue.getNode().getNodeID());
+ outputValue.set(inFlag, inputValue.getNode());
sawCurNode = true;
- } else {
- throw new IOException("Unknown origin for merging node");
- }
- if (!values.hasNext()) {
break;
- } else {
- inputValue.set(values.next());
- }
+ case MessageFlag.MSG_UPDATE_MERGE:
+ addMergeMessage(inputValue);
+ break;
+ case MessageFlag.MSG_UPDATE_EDGE:
+ throw new IOException("Error: update message recieved during merge phase!" + inputValue);
+ default:
+ throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
}
- if (count != 2 && count != 3) {
- throw new IOException("Expected two or three nodes in MergePathsH4 reduce; saw "
- + String.valueOf(count));
- }
- if (!sawCurNode) {
- throw new IOException("Didn't see node from self in MergePathsH4 reduce!");
- }
+ }
+ if (!sawCurNode) {
+ throw new IOException("Never saw self in recieve update messages!");
+ }
- // merge any received nodes
- if (sawNextNode) {
- curNode.mergeForwardNext(nextNode, KMER_SIZE);
- reporter.incrCounter("genomix", "num_merged", 1);
- }
- if (sawPrevNode) {
- // TODO: fix this merge command! which one is the right one?
- curNode.mergeForwardPre(prevNode, KMER_SIZE);
- reporter.incrCounter("genomix", "num_merged", 1);
- }
+ // process all the merge messages for this node
+ for (int i = 0; i < mergeMsgsCount; i++) {
+ outputValue.processUpdates(mergeMsgs.get(i), KMER_SIZE);
+ }
- outputValue.set(outFlag, curNode);
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
- // True heads meeting tails => merge is complete for this node
- completeCollector.collect(key, outputValue);
- } else {
- toMergeCollector.collect(key, outputValue);
- }
+ if (!outputValue.getNode().isSimpleOrTerminalPath()) {
+ // not a mergeable path, can't tell if it still needs updates!
+ toUpdateCollector.collect(outputKey, outputValue);
+ } else if ((outputValue.getFlag() & MessageFlag.IS_HEAD) > 0
+ && ((outputValue.getFlag() & MessageFlag.IS_TAIL) > 0)) {
+ // H + T indicates a complete path
+ completeCollector.collect(outputValue.getNode(), NullWritable.get());
+ } else {
+ // not finished merging yet
+ toMergeCollector.collect(outputKey, outputValue);
}
}
@@ -556,11 +479,13 @@
/*
* Run one iteration of the mergePaths algorithm
*/
- public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput,
- JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String toUpdateOutput, String completeOutput, JobConf baseConf)
+ throws IOException {
JobConf conf = new JobConf(baseConf);
+ FileSystem dfs = FileSystem.get(conf);
conf.setJarByClass(MergePathsH4.class);
conf.setJobName("MergePathsH4 " + inputPath);
+<<<<<<< HEAD
//another comment
@@ -568,45 +493,51 @@
Path outputPath = new Path(inputPath + ".h4merge.tmp");
FileOutputFormat.setOutputPath(conf, outputPath);
+=======
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(NullOutputFormat.class);
-
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
conf.setOutputValueClass(NodeWithFlagWritable.class);
+ // step 1: decide merge dir and send updates
+ FileInputFormat.addInputPaths(conf, inputPath);
+ String outputUpdatesTmp = "h4.updatesProcessed." + new Random().nextDouble() + ".tmp"; // random filename
+ FileOutputFormat.setOutputPath(conf, new Path(outputUpdatesTmp));
+ dfs.delete(new Path(outputUpdatesTmp), true);
conf.setMapperClass(H4UpdatesMapper.class);
conf.setReducerClass(H4UpdatesReducer.class);
-
- MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
- MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
- MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
-
- FileSystem dfs = FileSystem.get(conf);
- // clean output dirs
- dfs.delete(outputPath, true);
- dfs.delete(new Path(toMergeOutput), true);
- dfs.delete(new Path(completeOutput), true);
- dfs.delete(new Path(updatesOutput), true);
-
RunningJob job = JobClient.runJob(conf);
- // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
- if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.TO_MERGE_OUTPUT), new Path(
- toMergeOutput))) {
- dfs.mkdirs(new Path(toMergeOutput));
+ // step 2: process merges
+ FileInputFormat.addInputPaths(conf, outputUpdatesTmp);
+ for (Path out : FileInputFormat.getInputPaths(conf)) {
+ System.out.println(out);
}
- if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.COMPLETE_OUTPUT), new Path(
+ Path outputMergeTmp = new Path("h4.mergeProcessed." + new Random().nextDouble() + ".tmp"); // random filename
+ FileOutputFormat.setOutputPath(conf, outputMergeTmp);
+ MultipleOutputs.addNamedOutput(conf, H4MergeReducer.TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, NodeWithFlagWritable.class);
+ MultipleOutputs.addNamedOutput(conf, H4MergeReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ NodeWritable.class, NullWritable.class);
+ dfs.delete(outputMergeTmp, true);
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(H4MergeReducer.class);
+ job = JobClient.runJob(conf);
+
+ // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
+ if (!dfs.rename(new Path(outputMergeTmp + File.separator + H4MergeReducer.TO_UPDATE_OUTPUT), new Path(
+ toUpdateOutput))) {
+ dfs.mkdirs(new Path(toUpdateOutput));
+ }
+ if (!dfs.rename(new Path(outputMergeTmp + File.separator + H4MergeReducer.COMPLETE_OUTPUT), new Path(
completeOutput))) {
dfs.mkdirs(new Path(completeOutput));
}
- if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.UPDATES_OUTPUT),
- new Path(updatesOutput))) {
- dfs.mkdirs(new Path(updatesOutput));
+ if (!dfs.rename(outputMergeTmp, new Path(toMergeOutput))) {
+ dfs.mkdirs(new Path(toMergeOutput));
}
return job;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index a3ea666..2765920 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -15,6 +15,7 @@
package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
import java.io.IOException;
+import java.util.ArrayList;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -25,32 +26,30 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
+import edu.uci.ics.genomix.hadoop.pmcommon.ConvertGraphFromNodeWithFlagToNodeWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
@SuppressWarnings("deprecation")
public class MergePathsH4Driver {
private static final String TO_MERGE = "toMerge";
+ private static final String TO_UPDATE = "toUpdate";
private static final String COMPLETE = "complete";
- private static final String UPDATES = "updates";
private String mergeOutput;
+ private String toUpdateOutput;
private String completeOutput;
- private String updatesOutput;
private void setOutputPaths(String basePath, int mergeIteration) {
basePath = basePath.replaceAll("/$", ""); // strip trailing slash
mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
+ toUpdateOutput = basePath + "_" + TO_UPDATE + "_i" + mergeIteration;
completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
- updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
}
private static class Options {
@Option(name = "-inputpath", usage = "the input path", required = true)
public String inputPath;
- @Option(name = "-outputpath", usage = "the output path", required = true)
- public String outputPath;
-
@Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
public String mergeResultPath;
@@ -74,8 +73,8 @@
* iterations of path merging. Updates during the merge are batch-processed
* at the end in a final update job.
*/
- public void run(String inputGraphPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
- String defaultConfPath, JobConf defaultConf) throws IOException {
+ public String run(String inputGraphPath, int numReducers, int sizeKmer, int mergeRound, String defaultConfPath,
+ JobConf defaultConf) throws IOException {
JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
if (defaultConfPath != null) {
baseConf.addResource(new Path(defaultConfPath));
@@ -85,31 +84,35 @@
FileSystem dfs = FileSystem.get(baseConf);
int iMerge = 0;
+ boolean mergeComplete = false;
+ String prevToMergeOutput = inputGraphPath;
+ ArrayList<String> completeOutputs = new ArrayList<String>();
// identify head and tail nodes with pathnode initial
PathNodeInitial inith4 = new PathNodeInitial();
setOutputPaths(inputGraphPath, iMerge);
- String prevToMergeOutput = inputGraphPath;
- System.out.println("initial run. toMerge: " + mergeOutput + ", complete: " + completeOutput);
- inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
- dfs.copyToLocalFile(new Path(mergeOutput), new Path("initial-toMerge"));
- dfs.copyToLocalFile(new Path(completeOutput), new Path("initial-complete"));
+ inith4.run(prevToMergeOutput, mergeOutput, toUpdateOutput, completeOutput, baseConf);
+ completeOutputs.add(completeOutput);
+ // dfs.copyToLocalFile(new Path(mergeOutput), new Path("initial-toMerge"));
+ // dfs.copyToLocalFile(new Path(completeOutput), new Path("initial-complete"));
// several iterations of merging
MergePathsH4 merger = new MergePathsH4();
for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
prevToMergeOutput = mergeOutput;
setOutputPaths(inputGraphPath, iMerge);
- merger.run(prevToMergeOutput, mergeOutput, completeOutput, updatesOutput, baseConf);
-// dfs.copyToLocalFile(new Path(mergeOutput), new Path("i" + iMerge +"-toMerge"));
-// dfs.copyToLocalFile(new Path(completeOutput), new Path("i" + iMerge +"-complete"));
-// dfs.copyToLocalFile(new Path(updatesOutput), new Path("i" + iMerge +"-updates"));
-
+ merger.run(prevToMergeOutput, mergeOutput, toUpdateOutput, completeOutput, baseConf);
+ completeOutputs.add(completeOutput);
+ // dfs.copyToLocalFile(new Path(mergeOutput), new Path("i" + iMerge +"-toMerge"));
+ // dfs.copyToLocalFile(new Path(completeOutput), new Path("i" + iMerge +"-complete"));
+
if (dfs.listStatus(new Path(mergeOutput)) == null || dfs.listStatus(new Path(mergeOutput)).length == 0) {
// no output from previous run-- we are done!
+ mergeComplete = true;
break;
}
}
+<<<<<<< HEAD
// finally, combine all the completed paths and update messages to
// create a single merged graph output
@@ -130,25 +133,34 @@
};
// test comment
+=======
+ if (!mergeComplete) {
+ // if the merge didn't finish, we have to do one final iteration to convert back into (NodeWritable, NullWritable) pairs
+ ConvertGraphFromNodeWithFlagToNodeWritable converter = new ConvertGraphFromNodeWithFlagToNodeWritable();
+ converter.run(prevToMergeOutput, completeOutput, baseConf);
+ completeOutputs.add(completeOutput);
+ }
+
+ // final output string is a comma-separated list of completeOutputs
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
StringBuilder sb = new StringBuilder();
String delim = "";
- for (FileStatus file : dfs.globStatus(new Path(inputGraphPath.replaceAll("/$", "") + "*"), updateFilter)) {
- sb.append(delim).append(file.getPath());
+ for (String output : completeOutputs) {
+ sb.append(delim).append(output);
delim = ",";
}
String finalInputs = sb.toString();
- System.out.println("This is the final sacrifice: " + finalInputs);
- // TODO run the update iteration
+ return finalInputs;
}
- public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
- String defaultConfPath) throws IOException {
- run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
+ public String run(String inputPath, int numReducers, int sizeKmer, int mergeRound, String defaultConfPath)
+ throws IOException {
+ return run(inputPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
}
- public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
- JobConf defaultConf) throws IOException {
- run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
+ public String run(String inputPath, int numReducers, int sizeKmer, int mergeRound, JobConf defaultConf)
+ throws IOException {
+ return run(inputPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
}
public static void main(String[] args) throws Exception {
@@ -156,7 +168,7 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
MergePathsH4Driver driver = new MergePathsH4Driver();
- driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, options.mergeRound,
- null, null);
+ String outputs = driver.run(options.inputPath, options.numReducers, options.sizeKmer, options.mergeRound, null, null);
+ System.out.println("Job ran. Find outputs in " + outputs);
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/ConvertGraphFromNodeWithFlagToNodeWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/ConvertGraphFromNodeWithFlagToNodeWritable.java
new file mode 100644
index 0000000..a060d5f
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/ConvertGraphFromNodeWithFlagToNodeWritable.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+/*
+ * Convert the graph from (PositionWritable, NodeWritableWithFlag) to (NodeWritable, NullWritable)
+ */
+@SuppressWarnings("deprecation")
+public class ConvertGraphFromNodeWithFlagToNodeWritable extends Configured implements Tool {
+
+ public static class ConvertGraphMapper extends MapReduceBase implements
+ Mapper<PositionWritable, NodeWithFlagWritable, NodeWritable, NullWritable> {
+
+ /*
+ * Convert the graph
+ */
+ @Override
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
+ output.collect(value.getNode(), NullWritable.get());
+ }
+ }
+
+ /*
+ * Convert the graph
+ */
+ public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ JobConf conf = new JobConf(baseConf);
+ conf.setJarByClass(ConvertGraphFromNodeWithFlagToNodeWritable.class);
+ conf.setJobName("Convert graph to NodeWritable " + inputPath);
+ conf.setMapOutputKeyClass(NodeWritable.class);
+ conf.setMapOutputValueClass(NullWritable.class);
+ conf.setOutputKeyClass(NodeWritable.class);
+ conf.setOutputValueClass(NullWritable.class);
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileInputFormat.addInputPaths(conf, inputPath);
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(new Path(outputPath), true); // clean output dir
+
+ conf.setMapperClass(ConvertGraphMapper.class);
+// conf.setReducerClass(PathNodeInitialReducer.class);
+ conf.setNumReduceTasks(0);
+ RunningJob job = JobClient.runJob(conf);
+
+ return job;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new ConvertGraphFromNodeWithFlagToNodeWritable(), args);
+ return res;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = new ConvertGraphFromNodeWithFlagToNodeWritable().run(args);
+ System.exit(res);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 4148982..3af7069 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -43,7 +43,7 @@
byte neighborToMeDir = mirrorDirection(neighborDir);
byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
- // clear previous kmer and edge data
+ // clear previous kmer and edge data
node.reset(0);
// indicate the node to delete
@@ -73,8 +73,9 @@
}
/*
- * When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge,
- * returns the new edge type for A->C
+ * When A->B edge type is @neighborDir and B will merge towards C along a
+ *
+ * @mergeDir edge, returns the new edge type for A->C
*/
public static byte flipDirection(byte neighborDir, byte mergeDir) {
switch (mergeDir) {
@@ -106,10 +107,10 @@
}
/*
- * Process any changes to @node contained in @updateMsg. This includes merges and edge updates
+ * Process any changes to @self contained in @updateMsg. This includes
+ * merges and edge updates.
*/
- public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize)
- throws IOException {
+ public void processUpdates(NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
byte updateFlag = updateMsg.getFlag();
NodeWritable updateNode = updateMsg.getNode();
@@ -131,7 +132,25 @@
node.getListFromDir(updateFlag).remove(updateNode.getNodeID()); // remove the node from my edges
node.getKmer().mergeWithKmerInDir(updateFlag, kmerSize, updateNode.getKmer()); // merge with its kmer
- // merge my edges with the incoming node's edges, accounting for if the node flipped in
+ // pass along H/T information from the merging node. flipping H ->T, T -> H
+ switch (updateFlag & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_RR:
+ flag |= (byte) (updateFlag & MessageFlag.IS_HEAD);
+ flag |= (byte) (updateFlag & MessageFlag.IS_TAIL);
+ break;
+ case MessageFlag.DIR_FR:
+ case MessageFlag.DIR_RF:
+ if ((updateFlag & MessageFlag.IS_HEAD) > 0)
+ flag |= (byte) (updateFlag & MessageFlag.IS_TAIL);
+ if ((updateFlag & MessageFlag.IS_TAIL) > 0)
+ flag |= (byte) (updateFlag & MessageFlag.IS_HEAD);
+ break;
+ default:
+ throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+ }
+
+ // merge my edges with the incoming node's edges, accounting for if the node flipped in
// the merge and if it's my predecessor or successor
switch (updateFlag & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
@@ -140,27 +159,31 @@
node.getFRList().set(updateNode.getFRList());
// update isn't allowed to have any other successors (mirror & flip)
if (updateNode.getRFList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_FR:
// flip edges
node.getFFList().set(updateNode.getRFList());
node.getFRList().set(updateNode.getRRList());
if (updateNode.getFFList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_RF:
// flip edges
node.getRFList().set(updateNode.getFFList());
node.getRRList().set(updateNode.getFRList());
if (updateNode.getRRList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_RR:
node.getRFList().set(updateNode.getRFList());
node.getRRList().set(updateNode.getRRList());
if (updateNode.getFRList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
default:
throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
@@ -213,10 +236,7 @@
}
public NodeWritable getNode() {
- if (node.getCount() != 0) {
- return node;
- }
- return null;
+ return node;
}
public byte getFlag() {
@@ -246,7 +266,7 @@
@Override
public int hashCode() {
- // return super.hashCode() + flag + node.hashCode();
+ // return super.hashCode() + flag + node.hashCode();
return flag + node.hashCode();
}
@@ -258,4 +278,8 @@
}
return false;
}
+
+ public void setNode(NodeWritable otherNode) {
+ node.set(otherNode);
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 3c46dc7..d9b7763 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -34,8 +34,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -56,7 +56,6 @@
public class PathNodeInitial extends Configured implements Tool {
public static final String COMPLETE_OUTPUT = "complete";
- public static final String TO_MERGE_OUTPUT = "toMerge";
public static final String TO_UPDATE_OUTPUT = "toUpdate";
private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
@@ -89,7 +88,6 @@
Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
- private PositionWritable outputKey;
private NodeWithFlagWritable outputValue;
private int inDegree;
private int outDegree;
@@ -98,7 +96,6 @@
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
- outputKey = new PositionWritable();
}
/*
@@ -158,8 +155,8 @@
public static class PathNodeInitialReducer extends MapReduceBase implements
Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
- private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
+ private OutputCollector<NodeWritable, NullWritable> completeCollector;
private int KMER_SIZE;
private NodeWithFlagWritable inputValue;
@@ -169,6 +166,7 @@
private byte inputFlag;
private boolean sawSelf;
+ @Override
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
@@ -177,6 +175,11 @@
nodeToKeep = new NodeWritable(KMER_SIZE);
}
+ @Override
+ public void close() throws IOException {
+ mos.close();
+ }
+
/*
* Segregate nodes into three bins:
* 1. mergeable nodes (maybe marked H or T)
@@ -191,13 +194,15 @@
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
throws IOException {
- completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
+ completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
outputFlag = MessageFlag.EMPTY_MESSAGE;
sawSelf = false;
while (values.hasNext()) {
- inputValue.set(values.next());
+ NodeWithFlagWritable next = values.next();
+ System.out.println(next);
+ inputValue.set(next);
inputFlag = inputValue.getFlag();
outputFlag |= inputFlag;
@@ -211,19 +216,23 @@
nodeToKeep.set(inputValue.getNode());
}
}
+ if (!sawSelf) {
+ throw new IOException("Didn't see a self node in PathNodeInitial! flag: " + outputFlag);
+ }
- if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
+ if (!nodeToKeep.isSimpleOrTerminalPath()) {
+ // non-path nodes need updates if adjacent to path nodes
+ if ((outputFlag & NEAR_PATH) > 0) {
+ outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+ toUpdateCollector.collect(key, outputValue);
+ } else {
+ // not adjacent... store in "complete" output
+ completeCollector.collect(nodeToKeep, NullWritable.get());
+ }
+ } else {
if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
- // non-path or single path nodes
- if ((outputFlag & NEAR_PATH) > 0) {
- // non-path, but an update candidate
- outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
- toUpdateCollector.collect(key, outputValue);
- } else {
- // non-path or single-node path. Store in "complete" output
- outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
- completeCollector.collect(key, outputValue);
- }
+ // path node, but cannot merge in either direction => complete
+ completeCollector.collect(nodeToKeep, NullWritable.get());
} else {
// path nodes that are mergeable
outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
@@ -238,54 +247,41 @@
reporter.incrCounter("genomix", "path_nodes_tails", 1);
}
}
- } else {
- throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
}
}
-
- public void close() throws IOException {
- mos.close();
- }
}
/*
* Mark the head, tail, and simple path nodes in one map-reduce job.
*/
- public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
+ public RunningJob run(String inputPath, String toMergeOutput, String toUpdateOutput, String completeOutput,
JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(PathNodeInitial.class);
conf.setJobName("PathNodeInitial " + inputPath);
-
- FileInputFormat.addInputPaths(conf, inputPath);
- // Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
- Path outputPath = new Path(toMergeOutput);
- FileOutputFormat.setOutputPath(conf, outputPath);
-
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(NullOutputFormat.class);
-
conf.setMapOutputKeyClass(PositionWritable.class);
conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
conf.setOutputValueClass(NodeWithFlagWritable.class);
- conf.setMapperClass(PathNodeInitialMapper.class);
- conf.setReducerClass(PathNodeInitialReducer.class);
-
- MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileInputFormat.addInputPaths(conf, inputPath);
+ FileOutputFormat.setOutputPath(conf, new Path(toMergeOutput));
MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
PositionWritable.class, NodeWithFlagWritable.class);
-
+ MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class, NodeWritable.class,
+ NullWritable.class);
FileSystem dfs = FileSystem.get(conf);
- dfs.delete(outputPath, true); // clean output dir
+ dfs.delete(new Path(toMergeOutput), true); // clean output dir
+
+ conf.setMapperClass(PathNodeInitialMapper.class);
+ conf.setReducerClass(PathNodeInitialReducer.class);
RunningJob job = JobClient.runJob(conf);
// move the tmp outputs to the arg-spec'ed dirs
- dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
- dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
- // dfs.rename(outputPath, new Path(toMergeOutput));
+ dfs.rename(new Path(toMergeOutput + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+ dfs.rename(new Path(toMergeOutput + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
return job;
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index c04ba46..7a65d59 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -16,7 +16,7 @@
@SuppressWarnings("deprecation")
public class TestPathMergeH3 extends GenomixMiniClusterTest {
- protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/sequence/bubble_test1.txt";
protected String HDFS_SEQUENCE = "/00-sequence/";
protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
protected String HDFS_MARKPATHS = "/02-pathmark/";
@@ -42,7 +42,7 @@
buildGraph();
}
- @Test
+// @Test
public void TestMergeOneIteration() throws Exception {
cleanUpOutput();
if (regenerateGraph) {
@@ -53,14 +53,14 @@
copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
}
- PathNodeInitial inith3 = new PathNodeInitial();
- inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
- copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
- copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
-
- MergePathsH3Driver h3 = new MergePathsH3Driver();
- h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
- copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
+// PathNodeInitial inith3 = new PathNodeInitial();
+// inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
+// copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+// copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+//
+// MergePathsH3Driver h3 = new MergePathsH3Driver();
+// h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
+// copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 45a2d0a..249af29 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -10,55 +10,69 @@
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3Driver;
-import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
@SuppressWarnings("deprecation")
public class TestPathMergeH4 extends HadoopMiniClusterTest {
- protected final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
- protected final String SEQUENCE = "/00-sequence/";
- protected final String GRAPHBUILD = "/01-graphbuild/";
- protected final String MERGED = "/02-pathmerge/";
-
- protected final String ACTUAL = "src/test/resources/actual/";
-
- protected final boolean regenerateGraph = true;
-
+ protected String SEQUENCE = "/sequence/";
+ protected String GRAPHBUILD = "/graphbuild-unmerged/";
+ protected String MERGED = "/pathmerge/";
+ protected String LOCAL_SEQUENCE_FILE;
+ protected String readsFile;
+ protected boolean regenerateGraph;
{
- KMER_LENGTH = 5;
- READ_LENGTH = 8;
HDFS_PATHS = new ArrayList<String>(Arrays.asList(SEQUENCE, GRAPHBUILD, MERGED));
- conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
- conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
}
-
+
+ public void setupTestConf(int kmerLength, int readLength, boolean regenerateGraph, String readsFile) {
+ KMER_LENGTH = kmerLength;
+ READ_LENGTH = readLength;
+ this.readsFile = readsFile;
+ this.regenerateGraph = regenerateGraph;
+ LOCAL_SEQUENCE_FILE = DATA_ROOT + SEQUENCE + readsFile;
+ }
+
@Test
- public void TestMergeOneIteration() throws Exception {
- cleanUpOutput();
- prepareGraph();
-
- MergePathsH4Driver h4 = new MergePathsH4Driver();
- h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 5, conf);
- copyResultsToLocal(MERGED, ACTUAL_ROOT + MERGED, false, conf);
+ public void testTwoReads() throws Exception {
+ setupTestConf(5, 8, false, "tworeads.txt");
+// testPathNode();
+ testMergeOneIteration();
+// testMergeToCompletion();
}
-
+
// @Test
+ public void testSimpleText() throws Exception {
+ setupTestConf(5, 8, false, "text.txt");
+ testPathNode();
+ testMergeOneIteration();
+// testMergeToCompletion();
+ }
+
public void testPathNode() throws IOException {
cleanUpOutput();
prepareGraph();
// identify head and tail nodes with pathnode initial
PathNodeInitial inith4 = new PathNodeInitial();
- inith4.run(GRAPHBUILD, "/toMerge", "/completed", conf);
+ inith4.run(GRAPHBUILD, "/toMerge", "/toUpdate", "/completed", conf);
+ copyResultsToLocal("/toMerge", ACTUAL_ROOT + "path_toMerge", false, conf);
+ copyResultsToLocal("/toUpdate", ACTUAL_ROOT + "path_toUpdate", false, conf);
+ copyResultsToLocal("/completed", ACTUAL_ROOT + "path_completed", false, conf);
}
-
-
+ public void testMergeOneIteration() throws Exception {
+ cleanUpOutput();
+ prepareGraph();
+
+ MergePathsH4Driver h4 = new MergePathsH4Driver();
+ String outputs = h4.run(GRAPHBUILD, 2, KMER_LENGTH, 1, conf);
+ int i=0;
+ for (String out : outputs.split(",")) {
+ copyResultsToLocal(out, ACTUAL_ROOT + MERGED + i++, false, conf);
+ }
+ }
public void buildGraph() throws IOException {
JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
@@ -76,9 +90,9 @@
if (regenerateGraph) {
copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
buildGraph();
- copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+ copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + readsFile + ".binmerge", GRAPHBUILD);
} else {
- copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+ copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + readsFile + ".binmerge", GRAPHBUILD);
}
}
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
index 7f1e1d7..1f4a2d2 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -76,9 +76,9 @@
new Path(localDestFile), false, conf, null);
} else {
// file is binary
- // save the entire binary output dir
- FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
- new Path(localDestFile + ".bindir"), false, conf);
+// // save the entire binary output dir
+// FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+// new Path(localDestFile + ".bindir"), false, conf);
// also load the Nodes and write them out as text locally.
FileSystem lfs = FileSystem.getLocal(new Configuration());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
index 9a113f1..89f9e96 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -40,8 +40,9 @@
// subclass should modify this to include the HDFS directories that should be cleaned up
protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
- protected static String EXPECTED_ROOT = "src/test/resources/expected/";
- protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+ protected static final String EXPECTED_ROOT = "src/test/resources/expected/";
+ protected static final String ACTUAL_ROOT = "src/test/resources/actual/";
+ protected static final String DATA_ROOT = "src/test/resources/data/";
protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test.txt
new file mode 100644
index 0000000..b450bd5
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test.txt
@@ -0,0 +1,3 @@
+1 ACGTA
+2 AAATA
+
diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test2.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test2.txt
new file mode 100644
index 0000000..ee05672
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/fr_test2.txt
@@ -0,0 +1,4 @@
+1 AAAACGTAT
+2 GGGAATACG
+3 CGTATTCCC
+
diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/rf_test.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/rf_test.txt
new file mode 100644
index 0000000..154dc8c
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/rf_test.txt
@@ -0,0 +1,3 @@
+1 ACGGTGTA
+2 ACCGTGGT
+
diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/singleread.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/singleread.txt
new file mode 100644
index 0000000..63a95ad
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/singleread.txt
@@ -0,0 +1 @@
+1 AATAGAAG
diff --git a/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/text.txt
old mode 100755
new mode 100644
similarity index 100%
rename from genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
rename to genomix/genomix-hadoop/src/test/resources/data/sequence/text.txt
diff --git a/genomix/genomix-hadoop/src/test/resources/data/sequence/tworeads.txt b/genomix/genomix-hadoop/src/test/resources/data/sequence/tworeads.txt
new file mode 100644
index 0000000..3e3bf7b
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/sequence/tworeads.txt
@@ -0,0 +1,2 @@
+1 AATAGAAG
+2 AGAAGCCC
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-merged/tworeads.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-merged/tworeads.txt
new file mode 100644
index 0000000..a2988b8
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-merged/tworeads.txt
@@ -0,0 +1,6 @@
+((1,1) [(1,3)] [] [] [] AATAGA) (null)
+((1,3) [(2,1),(1,4)] [] [] [(1,1)] TAGAA) (null)
+((1,4) [(2,2)] [] [] [(1,3)] AGAAG) (null)
+((2,1) [(2,2)] [] [] [(1,3)] AGAAG) (null)
+((2,2) [(2,3)] [] [] [(1,4),(2,1)] GAAGC) (null)
+((2,3) [] [] [] [(2,2)] AAGCCC) (null)
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test.txt
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test2.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test2.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/fr_test2.txt
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/text.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/text.txt
new file mode 100644
index 0000000..036bbfb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/text.txt
@@ -0,0 +1,24 @@
+((2,1) [(2,2)] [] [] [] AATAG) (null)
+((2,2) [(2,3)] [] [] [(2,1)] ATAGA) (null)
+((2,3) [(6,1),(2,4)] [] [] [(2,2)] TAGAA) (null)
+((2,4) [(6,2)] [] [] [(2,3)] AGAAG) (null)
+((4,1) [(4,2)] [] [] [] AATAG) (null)
+((4,2) [(4,3)] [] [] [(4,1)] ATAGA) (null)
+((4,3) [(6,1),(4,4)] [] [] [(4,2)] TAGAA) (null)
+((4,4) [(6,2)] [] [] [(4,3)] AGAAG) (null)
+((6,1) [(6,2)] [] [] [(2,3),(1,3),(3,3),(4,3),(5,3)] AGAAG) (null)
+((6,2) [(6,3)] [] [] [(2,4),(3,4),(1,4),(4,4),(5,4),(6,1)] GAAGA) (null)
+((6,3) [(6,4)] [] [] [(6,2)] AAGAA) (null)
+((6,4) [] [] [] [(6,3)] AGAAG) (null)
+((1,1) [(1,2)] [] [] [] AATAG) (null)
+((1,2) [(1,3)] [] [] [(1,1)] ATAGA) (null)
+((1,3) [(6,1),(1,4)] [] [] [(1,2)] TAGAA) (null)
+((1,4) [(6,2)] [] [] [(1,3)] AGAAG) (null)
+((3,1) [(3,2)] [] [] [] AATAG) (null)
+((3,2) [(3,3)] [] [] [(3,1)] ATAGA) (null)
+((3,3) [(6,1),(3,4)] [] [] [(3,2)] TAGAA) (null)
+((3,4) [(6,2)] [] [] [(3,3)] AGAAG) (null)
+((5,1) [(5,2)] [] [] [] AATAG) (null)
+((5,2) [(5,3)] [] [] [(5,1)] ATAGA) (null)
+((5,3) [(6,1),(5,4)] [] [] [(5,2)] TAGAA) (null)
+((5,4) [(6,2)] [] [] [(5,3)] AGAAG) (null)
diff --git a/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/tworeads.txt b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/tworeads.txt
new file mode 100644
index 0000000..036bbfb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/expected/graphbuild-unmerged/tworeads.txt
@@ -0,0 +1,24 @@
+((2,1) [(2,2)] [] [] [] AATAG) (null)
+((2,2) [(2,3)] [] [] [(2,1)] ATAGA) (null)
+((2,3) [(6,1),(2,4)] [] [] [(2,2)] TAGAA) (null)
+((2,4) [(6,2)] [] [] [(2,3)] AGAAG) (null)
+((4,1) [(4,2)] [] [] [] AATAG) (null)
+((4,2) [(4,3)] [] [] [(4,1)] ATAGA) (null)
+((4,3) [(6,1),(4,4)] [] [] [(4,2)] TAGAA) (null)
+((4,4) [(6,2)] [] [] [(4,3)] AGAAG) (null)
+((6,1) [(6,2)] [] [] [(2,3),(1,3),(3,3),(4,3),(5,3)] AGAAG) (null)
+((6,2) [(6,3)] [] [] [(2,4),(3,4),(1,4),(4,4),(5,4),(6,1)] GAAGA) (null)
+((6,3) [(6,4)] [] [] [(6,2)] AAGAA) (null)
+((6,4) [] [] [] [(6,3)] AGAAG) (null)
+((1,1) [(1,2)] [] [] [] AATAG) (null)
+((1,2) [(1,3)] [] [] [(1,1)] ATAGA) (null)
+((1,3) [(6,1),(1,4)] [] [] [(1,2)] TAGAA) (null)
+((1,4) [(6,2)] [] [] [(1,3)] AGAAG) (null)
+((3,1) [(3,2)] [] [] [] AATAG) (null)
+((3,2) [(3,3)] [] [] [(3,1)] ATAGA) (null)
+((3,3) [(6,1),(3,4)] [] [] [(3,2)] TAGAA) (null)
+((3,4) [(6,2)] [] [] [(3,3)] AGAAG) (null)
+((5,1) [(5,2)] [] [] [] AATAG) (null)
+((5,2) [(5,3)] [] [] [(5,1)] ATAGA) (null)
+((5,3) [(6,1),(5,4)] [] [] [(5,2)] TAGAA) (null)
+((5,4) [(6,2)] [] [] [(5,3)] AGAAG) (null)