Better testing for initial path merge
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 2ced8dd..b240833 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -36,7 +36,7 @@
* Mapper class: Partition the graph using random pseudoheads.
* Heads send themselves to their successors, and all others map themselves.
*/
- private static class MergePathsH4Mapper extends MapReduceBase implements
+ public static class MergePathsH4Mapper extends MapReduceBase implements
Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
private static long randSeed;
private Random randGenerator;
@@ -200,6 +200,10 @@
outputValue.set(outFlag, curNode);
output.collect(curID, outputValue);
}
+ else {
+ // TODO send update to this node's neighbors
+ //mos.getCollector(UPDATES_OUTPUT, reporter).collect(key, outputValue);
+ }
}
}
@@ -210,6 +214,7 @@
Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
private MultipleOutputs mos;
public static final String COMPLETE_OUTPUT = "complete";
+ public static final String UPDATES_OUTPUT = "update";
private int KMER_SIZE;
private MessageWritableNodeWithFlag inputValue;
@@ -304,7 +309,6 @@
if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// True heads meeting tails => merge is complete for this node
mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
- // TODO send update to this node's neighbors
} else {
output.collect(key, outputValue);
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
index 91415ed..f05797e 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
@@ -7,6 +7,7 @@
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
/*
@@ -20,7 +21,7 @@
public MessageWritableNodeWithFlag() {
this(0);
}
-
+
public MessageWritableNodeWithFlag(int k) {
this.flag = 0;
this.node = new NodeWritable(k);
@@ -30,6 +31,11 @@
this.flag = flag;
this.node = new NodeWritable(kmerSize);
}
+
+ public MessageWritableNodeWithFlag(byte flag, NodeWritable node) {
+ this(node.getKmer().getKmerLength());
+ set(flag, node);
+ }
public void set(MessageWritableNodeWithFlag right) {
set(right.getFlag(), right.getNode());
@@ -79,4 +85,19 @@
public int getLength() {
return node.getCount();
}
+
+ @Override
+ public int hashCode() {
+// return super.hashCode() + flag + node.hashCode();
+ return flag + node.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object rightObj) {
+ if (rightObj instanceof MessageWritableNodeWithFlag) {
+ MessageWritableNodeWithFlag rightMessage = (MessageWritableNodeWithFlag) rightObj;
+ return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
+ }
+ return false;
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 5fbf33f..497e926 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -60,10 +60,6 @@
private NodeWritable emptyNode;
private Iterator<PositionWritable> posIterator;
- public PathNodeInitialMapper() {
-
- }
-
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
new file mode 100644
index 0000000..b142f87
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mrunit.MapDriver;
+import org.apache.hadoop.mrunit.ReduceDriver;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class TestPathNodeInitial {
+ PositionWritable posn1 = new PositionWritable(0, (byte) 1);
+ PositionWritable posn2 = new PositionWritable(1, (byte) 1);
+ PositionWritable posn3 = new PositionWritable(2, (byte) 1);
+ PositionWritable posn4 = new PositionWritable(3, (byte) 1);
+ PositionWritable posn5 = new PositionWritable(5, (byte) 1);
+ String kmerString = "ATGCA";
+ KmerBytesWritable kmer = new KmerBytesWritable(kmerString.length(), kmerString);
+ JobConf conf = new JobConf();
+ MultipleOutputs mos = new MultipleOutputs(conf);
+
+ {
+ conf.set("sizeKmer", String.valueOf(kmerString.length()));
+ }
+
+ @Test
+ public void testNoNeighbors() throws IOException {
+ NodeWritable noNeighborNode = new NodeWritable(posn1, new PositionListWritable(), new PositionListWritable(),
+ new PositionListWritable(), new PositionListWritable(), kmer);
+ MessageWritableNodeWithFlag output = new MessageWritableNodeWithFlag((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), noNeighborNode);
+ // test mapper
+ new MapDriver<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag>()
+ .withMapper(new PathNodeInitial.PathNodeInitialMapper())
+ .withConfiguration(conf)
+ .withInput(noNeighborNode, NullWritable.get())
+ .withOutput(posn1, output)
+ .runTest();
+ // test reducer
+// MultipleOutputs.addNamedOutput(conf, "complete", SequenceFileOutputFormat.class, PositionWritable.class, MessageWritableNodeWithFlag.class);
+ new ReduceDriver<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag>()
+ .withReducer(new PathNodeInitial.PathNodeInitialReducer())
+ .withConfiguration(conf)
+ .withInput(posn1, Arrays.asList(output))
+ .runTest();
+ }
+}