Merge branch 'anbangx/fullstack_genomix' of https://code.google.com/p/hyracks into anbangx/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 65931a2..844e2e8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -81,7 +81,7 @@
return kmer.getKmerLength();
}
- public void mergeForwadNext(NodeWritable nextNode, int initialKmerSize) {
+ public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
this.forwardForwardList.set(nextNode.forwardForwardList);
this.forwardReverseList.set(nextNode.forwardReverseList);
kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
index bd08a78..28c4108 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
@@ -42,8 +42,11 @@
@Option(name = "-kmer-size", usage = "the size of kmer", required = true)
public int sizeKmer;
- @Option(name = "-merge-rounds", usage = "the while rounds of merging", required = true)
+ @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
public int mergeRound;
+
+ @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
+ public String hadoopConf;
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
new file mode 100644
index 0000000..2ced8dd
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -0,0 +1,353 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH4 extends Configured implements Tool {
+
+ /*
+ * Mapper class: Partition the graph using random pseudoheads.
+ * Heads send themselves to their successors, and all others map themselves.
+ */
+ private static class MergePathsH4Mapper extends MapReduceBase implements
+ Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ private static long randSeed;
+ private Random randGenerator;
+ private float probBeingRandomHead;
+
+ private int KMER_SIZE;
+ private PositionWritable outputKey;
+ private MessageWritableNodeWithFlag outputValue;
+ private NodeWritable curNode;
+ private PositionWritable curID;
+ private PositionWritable nextID;
+ private PositionWritable prevID;
+ private boolean hasNext;
+ private boolean hasPrev;
+ private boolean curHead;
+ private boolean nextHead;
+ private boolean prevHead;
+ private boolean willMerge;
+ private byte headFlag;
+ private byte tailFlag;
+ private byte outFlag;
+
+ public void configure(JobConf conf) {
+
+ randSeed = conf.getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputKey = new PositionWritable();
+ curNode = new NodeWritable(KMER_SIZE);
+ curID = new PositionWritable();
+ nextID = new PositionWritable();
+ prevID = new PositionWritable();
+ }
+
+ protected boolean isNodeRandomHead(PositionWritable nodeID) {
+ // "deterministically random", based on node id
+ randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ return randGenerator.nextFloat() < probBeingRandomHead;
+ }
+
+ /*
+ * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
+ */
+ protected boolean setNextInfo(NodeWritable node) {
+ if (node.getFFList().getCountOfPosition() > 0) {
+ nextID.set(node.getFFList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ if (node.getFRList().getCountOfPosition() > 0) {
+ nextID.set(node.getFRList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
+ */
+ protected boolean setPrevInfo(NodeWritable node) {
+ if (node.getRRList().getCountOfPosition() > 0) {
+ prevID.set(node.getRRList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ if (node.getRFList().getCountOfPosition() > 0) {
+ prevID.set(node.getRFList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void map(PositionWritable key, MessageWritableNodeWithFlag value,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+ tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
+ outFlag = (byte) (headFlag | tailFlag);
+
+ // only PATH vertices are present. Find the ID's for my neighbors
+ curNode.set(value.getNode());
+ curID.set(curNode.getNodeID());
+
+ curHead = isNodeRandomHead(curID);
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(curNode) && tailFlag == 0;
+ hasPrev = setPrevInfo(curNode) && headFlag == 0;
+ willMerge = false;
+
+ reporter.setStatus("CHECK ME OUT");
+ System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
+
+ // TODO: need to update edges in neighboring nodes
+
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ // true HEAD met true TAIL. this path is complete
+ outFlag |= MessageFlag.FROM_SELF;
+ outputValue.set(outFlag, curNode);
+ output.collect(curID, outputValue);
+ return;
+ }
+ if (hasNext || hasPrev) {
+ if (curHead) {
+ if (hasNext && !nextHead) {
+ // compress this head to the forward tail
+ outFlag |= MessageFlag.FROM_PREDECESSOR;
+ outputValue.set(outFlag, curNode);
+ output.collect(nextID, outputValue);
+ willMerge = true;
+ } else if (hasPrev && !prevHead) {
+ // compress this head to the reverse tail
+ outFlag |= MessageFlag.FROM_SUCCESSOR;
+ outputValue.set(outFlag, curNode);
+ output.collect(prevID, outputValue);
+ willMerge = true;
+ }
+ } else {
+ // I'm a tail
+ if (hasNext && hasPrev) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ // tails on both sides, and I'm the "local minimum"
+ // compress me towards the tail in forward dir
+ outFlag |= MessageFlag.FROM_PREDECESSOR;
+ outputValue.set(outFlag, curNode);
+ output.collect(nextID, outputValue);
+ willMerge = true;
+ }
+ } else if (!hasPrev) {
+ // no previous node
+ if (!nextHead && curID.compareTo(nextID) < 0) {
+ // merge towards tail in forward dir
+ outFlag |= MessageFlag.FROM_PREDECESSOR;
+ outputValue.set(outFlag, curNode);
+ output.collect(nextID, outputValue);
+ willMerge = true;
+ }
+ } else if (!hasNext) {
+ // no next node
+ if (!prevHead && curID.compareTo(prevID) < 0) {
+ // merge towards tail in reverse dir
+ outFlag |= MessageFlag.FROM_SUCCESSOR;
+ outputValue.set(outFlag, curNode);
+ output.collect(prevID, outputValue);
+ willMerge = true;
+ }
+ }
+ }
+ }
+
+ // if we didn't send ourselves to some other node, remap ourselves for the next round
+ if (!willMerge) {
+ outFlag |= MessageFlag.FROM_SELF;
+ outputValue.set(outFlag, curNode);
+ output.collect(curID, outputValue);
+ }
+ }
+ }
+
+ /*
+ * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
+ */
+ private static class MergePathsH4Reducer extends MapReduceBase implements
+ Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ private MultipleOutputs mos;
+ public static final String COMPLETE_OUTPUT = "complete";
+
+ private int KMER_SIZE;
+ private MessageWritableNodeWithFlag inputValue;
+ private MessageWritableNodeWithFlag outputValue;
+ private NodeWritable curNode;
+ private NodeWritable prevNode;
+ private NodeWritable nextNode;
+ private boolean sawCurNode;
+ private boolean sawPrevNode;
+ private boolean sawNextNode;
+ private int count;
+ private byte outFlag;
+
+ public void configure(JobConf conf) {
+ mos = new MultipleOutputs(conf);
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ curNode = new NodeWritable(KMER_SIZE);
+ prevNode = new NodeWritable(KMER_SIZE);
+ nextNode = new NodeWritable(KMER_SIZE);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+
+ inputValue.set(values.next());
+ if (!values.hasNext()) {
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+ // complete path (H & T meet in this node)
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ } else {
+ // FROM_SELF => no merging this round. remap self
+ output.collect(key, inputValue);
+ }
+ } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
+ // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
+ throw new IOException("Only one value recieved in merge, but it wasn't from self!");
+ }
+ } else {
+ // multiple inputs => a merge will take place. Aggregate all, then collect the merged path
+ count = 0;
+ outFlag = MessageFlag.EMPTY_MESSAGE;
+ sawCurNode = false;
+ sawPrevNode = false;
+ sawNextNode = false;
+ while (true) { // process values; break when no more
+ count++;
+ outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
+ if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
+ prevNode.set(inputValue.getNode());
+ sawPrevNode = true;
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
+ nextNode.set(inputValue.getNode());
+ sawNextNode = true;
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ curNode.set(inputValue.getNode());
+ sawCurNode = true;
+ } else {
+ throw new IOException("Unknown origin for merging node");
+ }
+ if (!values.hasNext()) {
+ break;
+ } else {
+ inputValue.set(values.next());
+ }
+ }
+ if (count != 2 && count != 3) {
+ throw new IOException("Expected two or three nodes in MergePathsH4 reduce; saw "
+ + String.valueOf(count));
+ }
+ if (!sawCurNode) {
+ throw new IOException("Didn't see node from self in MergePathsH4 reduce!");
+ }
+
+ // merge any received nodes
+ if (sawNextNode) {
+ curNode.mergeForwardNext(nextNode, KMER_SIZE);
+ reporter.incrCounter("genomix", "num_merged", 1);
+ }
+ if (sawPrevNode) {
+ // TODO: fix this merge command! which one is the right one?
+ curNode.mergeForwardPre(prevNode, KMER_SIZE);
+ reporter.incrCounter("genomix", "num_merged", 1);
+ }
+
+ outputValue.set(outFlag, curNode);
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ // True heads meeting tails => merge is complete for this node
+ mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
+ // TODO send update to this node's neighbors
+ } else {
+ output.collect(key, outputValue);
+ }
+ }
+ }
+ }
+
+ /*
+ * Run one iteration of the mergePaths algorithm
+ */
+ public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ JobConf conf = new JobConf(baseConf);
+ conf.setJarByClass(MergePathsH4.class);
+ conf.setJobName("MergePathsH4 " + inputPath);
+
+ FileInputFormat.addInputPath(conf, new Path(inputPath));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ conf.setMapOutputKeyClass(PositionWritable.class);
+ conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputKeyClass(PositionWritable.class);
+ conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+
+ conf.setMapperClass(MergePathsH4Mapper.class);
+ conf.setReducerClass(MergePathsH4Reducer.class);
+
+ FileSystem.get(conf).delete(new Path(outputPath), true);
+
+ return JobClient.runJob(conf);
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
+ System.out.println("Ran the job fine!");
+ System.exit(res);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
new file mode 100644
index 0000000..155b999
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH4Driver {
+
+ private static class Options {
+ @Option(name = "-inputpath", usage = "the input path", required = true)
+ public String inputPath;
+
+ @Option(name = "-outputpath", usage = "the output path", required = true)
+ public String outputPath;
+
+ @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
+ public String mergeResultPath;
+
+ @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+ public int numReducers;
+
+ @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+ public int sizeKmer;
+
+ @Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
+ public int mergeRound;
+
+ @Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
+ public String hadoopConf;
+
+ }
+
+ public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+ String defaultConfPath, JobConf defaultConf) throws IOException {
+ JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
+ if (defaultConfPath != null) {
+ baseConf.addResource(new Path(defaultConfPath));
+ }
+ baseConf.setNumReduceTasks(numReducers);
+ baseConf.setInt("sizeKmer", sizeKmer);
+
+ FileSystem dfs = FileSystem.get(baseConf);
+ String prevOutput = inputPath;
+ dfs.delete(new Path(outputPath), true); // clear any previous output
+
+ String tmpOutputPath = "NO_JOBS_DONE";
+ boolean finalMerge = false;
+ for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
+ baseConf.setInt("iMerge", iMerge);
+ baseConf.setBoolean("finalMerge", finalMerge);
+ MergePathsH4 merger = new MergePathsH4();
+ tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
+ RunningJob job = merger.run(prevOutput, tmpOutputPath, baseConf);
+ if (job.getCounters().findCounter("genomix", "num_merged").getValue() == 0) {
+ if (!finalMerge) {
+ // all of the pseudoheads have found each other. H3 now behaves like H1
+ finalMerge = true;
+ } else {
+ // already in final merge stage and all paths were merged before. We're done!
+ break;
+ }
+ }
+ }
+ dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
+ }
+
+ public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+ String defaultConfPath) throws IOException {
+ run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
+ }
+
+ public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+ JobConf defaultConf) throws IOException {
+ run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ MergePathsH4Driver driver = new MergePathsH4Driver();
+ driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, options.mergeRound,
+ null, null);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index e7bcdf6..c8386ea 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -57,6 +57,7 @@
private int inDegree;
private int outDegree;
private NodeWritable emptyNode;
+ private Iterator<PositionWritable> posIterator;
public PathNodeInitialMapper() {
@@ -80,26 +81,51 @@
outputValue.set(MessageFlag.FROM_SELF, key);
output.collect(key.getNodeID(), outputValue);
reporter.incrCounter("genomix", "path_nodes", 1);
- } else if (outDegree == 1) {
- // Not a path myself, but my successor might be one. Map forward successor
+ } else if (inDegree == 0 && outDegree == 1) {
+ // start of a tip. needs to merge & be marked as head
+ outputValue.set(MessageFlag.FROM_SELF, key);
+ output.collect(key.getNodeID(), outputValue);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+
outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
- if (key.getFFList().getCountOfPosition() > 0) {
- outputKey.set(key.getFFList().getPosition(0));
- } else {
- outputKey.set(key.getFRList().getPosition(0));
- }
- output.collect(outputKey, outputValue);
- } else if (inDegree == 1) {
- // Not a path myself, but my predecessor might be one.
+ output.collect(key.getNodeID(), outputValue);
+ } else if (inDegree == 1 && outDegree == 0) {
+ // end of a tip. needs to merge & be marked as tail
+ outputValue.set(MessageFlag.FROM_SELF, key);
+ output.collect(key.getNodeID(), outputValue);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+
outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
- if (key.getRRList().getCountOfPosition() > 0) {
- outputKey.set(key.getRRList().getPosition(0));
- } else {
- outputKey.set(key.getRFList().getPosition(0));
- }
- output.collect(outputKey, outputValue);
+ output.collect(key.getNodeID(), outputValue);
} else {
- // TODO: all other nodes will not participate-- should they be collected in a "complete" output?
+ if (outDegree > 0) {
+ // Not a path myself, but my successor might be one. Map forward successor to find heads
+ outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
+ posIterator = key.getFFList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ posIterator = key.getFRList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ }
+ if (inDegree > 0) {
+ // Not a path myself, but my predecessor might be one. map predecessor to find tails
+ outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
+ posIterator = key.getRRList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ posIterator = key.getRFList().iterator();
+ while (posIterator.hasNext()) {
+ outputKey.set(posIterator.next());
+ output.collect(outputKey, outputValue);
+ }
+ }
}
}
}
@@ -128,7 +154,7 @@
inputValue.set(values.next());
if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
// FROM_SELF => need to keep this PATH node
output.collect(key, inputValue);
}
@@ -138,15 +164,14 @@
flag = MessageFlag.EMPTY_MESSAGE;
while (true) { // process values; break when no more
count++;
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
// SELF -> keep this node
+ flag |= MessageFlag.FROM_SELF;
nodeToKeep.set(inputValue.getNode());
- } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) == MessageFlag.FROM_SUCCESSOR) {
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
flag |= MessageFlag.IS_TAIL;
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
- } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
flag |= MessageFlag.IS_HEAD;
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
}
if (!values.hasNext()) {
break;
@@ -158,11 +183,21 @@
throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
+ String.valueOf(count));
}
- if ((flag & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
- // only map simple path nodes
+ if ((flag & MessageFlag.FROM_SELF) > 0) {
+ // only keep simple path nodes
outputValue.set(flag, nodeToKeep);
output.collect(key, outputValue);
+
reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((flag & MessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((flag & MessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ }
+ } else {
+ // this is a non-path node.
+ // TODO: keep this node in a "completed" reducer
}
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
index 470b7fa..d28db37 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -65,7 +65,7 @@
nextNode.set(nextNextNode);
continue;
}
- curNode.mergeForwadNext(nextNode, KMER_SIZE);
+ curNode.mergeForwardNext(nextNode, KMER_SIZE);
nextNode.set(nextNextNode);
}
output.collect(curNode, nullWritable);
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index 735c968..cc922de 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -22,10 +22,10 @@
protected String HDFS_MARKPATHS = "/02-pathmark/";
protected String HDFS_MERGED = "/03-pathmerge/";
- protected String GRAPHBUILD_FILE = "result.graphbuild.txt";
- protected String PATHMARKS_FILE = "result.markpaths.txt";
- protected String PATHMERGE_FILE = "result.mergepath.txt";
- protected boolean regenerateGraph = false;
+ protected String GRAPHBUILD_FILE = "graphbuild.result";
+ protected String PATHMARKS_FILE = "markpaths.result";
+ protected String PATHMERGE_FILE = "mergepath.result";
+ protected boolean regenerateGraph = true;
{
KMER_LENGTH = 5;
@@ -58,8 +58,7 @@
copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
MergePathsH3Driver h3 = new MergePathsH3Driver();
- h3.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 10, null, conf);
-// h3.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 10, HADOOP_CONF_ROOT + "conf.xml", null);
+ h3.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
new file mode 100644
index 0000000..9e799f3
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3Driver;
+import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+
+@SuppressWarnings("deprecation")
+public class TestPathMergeH4 extends GenomixMiniClusterTest {
+ protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ protected String HDFS_SEQUENCE = "/00-sequence/";
+ protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
+ protected String HDFS_MARKPATHS = "/02-pathmark/";
+ protected String HDFS_MERGED = "/03-pathmerge/";
+
+ protected String GRAPHBUILD_FILE = "graphbuild.result";
+ protected String PATHMARKS_FILE = "markpaths.result";
+ protected String PATHMERGE_FILE = "h4.mergepath.result";
+ protected boolean regenerateGraph = true;
+
+ {
+ KMER_LENGTH = 5;
+ READ_LENGTH = 8;
+ HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MARKPATHS, HDFS_MERGED));
+ conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+ conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+ }
+
+ @Test
+ public void TestMergeOneIteration() throws Exception {
+ cleanUpOutput();
+ if (regenerateGraph) {
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
+ buildGraph();
+ copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
+ } else {
+ copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
+ }
+
+ PathNodeInitial inith4 = new PathNodeInitial();
+ inith4.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
+ copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+
+ MergePathsH4Driver h4 = new MergePathsH4Driver();
+ h4.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
+ copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
+ }
+
+
+
+ public void buildGraph() throws Exception {
+ JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
+ FileInputFormat.setInputPaths(buildConf, HDFS_SEQUENCE);
+ FileOutputFormat.setOutputPath(buildConf, new Path(HDFS_GRAPHBUILD));
+ buildConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ buildConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(buildConf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+ String fileFormat = buildConf.get(GenomixJobConf.OUTPUT_FORMAT);
+ boolean resultsAreText = GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat);
+ copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, resultsAreText, buildConf);
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index 6c26feb..7394b71 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -135,7 +135,7 @@
nextNodeEntry.set(nextNextNodeEntry);
continue;
}
- curNodeEntry.mergeForwadNext(nextNodeEntry, kmerSize);
+ curNodeEntry.mergeForwardNext(nextNodeEntry, kmerSize);
nextNodeEntry.set(nextNextNodeEntry);
}
outputNode(curNodeEntry);