refactor MergePaths message types to use directions and message types
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index ad8ead6..a7ca739 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -23,7 +23,7 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeFlag;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -44,14 +44,14 @@
* Heads send themselves to their successors, and all others map themselves.
*/
private static class MergePathsH3Mapper extends MapReduceBase implements
- Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private static long randSeed;
private Random randGenerator;
private float probBeingRandomHead;
private int KMER_SIZE;
private PositionWritable outputKey;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private byte headFlag;
private byte outFlag;
@@ -64,7 +64,7 @@
finalMerge = conf.getBoolean("finalMerge", false);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
outputKey = new PositionWritable();
curNode = new NodeWritable(KMER_SIZE);
}
@@ -76,8 +76,8 @@
}
@Override
- public void map(PositionWritable key, MessageWritableNodeWithFlag value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
curNode = value.getNode();
// Map all path vertices; Heads and pseudoheads are sent to their successors
@@ -103,7 +103,7 @@
output.collect(outputKey, outputValue);
} else {
// tail nodes map themselves
- outFlag |= MergeMessageFlag.FROM_SELF;
+ outFlag |= MergeMessageFlag.MSG_SELF;
outputValue.set(outFlag, curNode);
output.collect(key, outputValue);
}
@@ -114,11 +114,11 @@
* Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
*/
private static class MergePathsH3Reducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable headNode;
private NodeWritable tailNode;
private int count;
@@ -126,20 +126,20 @@
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
headNode = new NodeWritable(KMER_SIZE);
tailNode = new NodeWritable(KMER_SIZE);
}
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
inputValue = values.next();
if (!values.hasNext()) {
// all single nodes must be remapped
- if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) == MergeMessageFlag.FROM_SELF) {
+ if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) == MergeMessageFlag.MSG_SELF) {
// FROM_SELF => remap self
output.collect(key, inputValue);
} else {
@@ -202,9 +202,9 @@
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
conf.setMapperClass(MergePathsH3Mapper.class);
conf.setReducerClass(MergePathsH3Reducer.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 891be154..7e6df5e 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -29,11 +29,11 @@
import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
@SuppressWarnings("deprecation")
public class MergePathsH4 extends Configured implements Tool {
@@ -43,14 +43,14 @@
* Heads send themselves to their successors, and all others map themselves.
*/
public static class MergePathsH4Mapper extends MapReduceBase implements
- Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private static long randSeed;
private Random randGenerator;
private float probBeingRandomHead;
private int KMER_SIZE;
private PositionWritable outputKey;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private PositionWritable curID;
private PositionWritable nextID;
@@ -72,7 +72,7 @@
probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
outputKey = new PositionWritable();
curNode = new NodeWritable(KMER_SIZE);
curID = new PositionWritable();
@@ -121,12 +121,12 @@
}
@Override
- public void map(PositionWritable key, MessageWritableNodeWithFlag value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
// Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (MergeMessageFlag.IS_HEAD & value.getFlag());
- tailFlag = (byte) (MergeMessageFlag.IS_TAIL & value.getFlag());
+ headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+ tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
outFlag = (byte) (headFlag | tailFlag);
// only PATH vertices are present. Find the ID's for my neighbors
@@ -140,14 +140,11 @@
hasPrev = setPrevInfo(curNode) && headFlag == 0;
willMerge = false;
- reporter.setStatus("CHECK ME OUT");
- System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
-
// TODO: need to update edges in neighboring nodes
- if ((outFlag & MergeMessageFlag.IS_HEAD) > 0 && (outFlag & MergeMessageFlag.IS_TAIL) > 0) {
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// true HEAD met true TAIL. this path is complete
- outFlag |= MergeMessageFlag.FROM_SELF;
+ outFlag |= MessageFlag.MSG_SELF;
outputValue.set(outFlag, curNode);
output.collect(curID, outputValue);
return;
@@ -156,13 +153,13 @@
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
- outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
+ outFlag |= MessageFlag.FROM_PREDECESSOR;
outputValue.set(outFlag, curNode);
output.collect(nextID, outputValue);
willMerge = true;
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
- outFlag |= MergeMessageFlag.FROM_SUCCESSOR;
+ outFlag |= MessageFlag.FROM_SUCCESSOR;
outputValue.set(outFlag, curNode);
output.collect(prevID, outputValue);
willMerge = true;
@@ -173,7 +170,7 @@
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
+ outFlag |= MessageFlag.FROM_PREDECESSOR;
outputValue.set(outFlag, curNode);
output.collect(nextID, outputValue);
willMerge = true;
@@ -182,7 +179,7 @@
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
- outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
+ outFlag |= MessageFlag.FROM_PREDECESSOR;
outputValue.set(outFlag, curNode);
output.collect(nextID, outputValue);
willMerge = true;
@@ -191,7 +188,7 @@
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
- outFlag |= MergeMessageFlag.FROM_SUCCESSOR;
+ outFlag |= MessageFlag.FROM_SUCCESSOR;
outputValue.set(outFlag, curNode);
output.collect(prevID, outputValue);
willMerge = true;
@@ -202,7 +199,7 @@
// if we didn't send ourselves to some other node, remap ourselves for the next round
if (!willMerge) {
- outFlag |= MergeMessageFlag.FROM_SELF;
+ outFlag |= MessageFlag.MSG_SELF;
outputValue.set(outFlag, curNode);
output.collect(curID, outputValue);
}
@@ -217,18 +214,18 @@
* Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
*/
private static class MergePathsH4Reducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
private static final String TO_MERGE_OUTPUT = "toMerge";
private static final String COMPLETE_OUTPUT = "complete";
private static final String UPDATES_OUTPUT = "update";
- private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
- private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
- private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> updatesCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private NodeWritable prevNode;
private NodeWritable nextNode;
@@ -241,8 +238,8 @@
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ inputValue = new NodeWithFlagWritable(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
prevNode = new NodeWritable(KMER_SIZE);
nextNode = new NodeWritable(KMER_SIZE);
@@ -250,8 +247,8 @@
@SuppressWarnings("unchecked")
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
@@ -259,35 +256,35 @@
inputValue.set(values.next());
if (!values.hasNext()) {
- if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
- if ((inputValue.getFlag() & MergeMessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MergeMessageFlag.IS_TAIL) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
// complete path (H & T meet in this node)
completeCollector.collect(key, inputValue);
} else {
// FROM_SELF => no merging this round. remap self
toMergeCollector.collect(key, inputValue);
}
- } else if ((inputValue.getFlag() & (MergeMessageFlag.FROM_PREDECESSOR | MergeMessageFlag.FROM_SUCCESSOR)) > 0) {
+ } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
// FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
throw new IOException("Only one value recieved in merge, but it wasn't from self!");
}
} else {
// multiple inputs => a merge will take place. Aggregate all, then collect the merged path
count = 0;
- outFlag = MergeMessageFlag.EMPTY_MESSAGE;
+ outFlag = MessageFlag.EMPTY_MESSAGE;
sawCurNode = false;
sawPrevNode = false;
sawNextNode = false;
while (true) { // process values; break when no more
count++;
- outFlag |= (inputValue.getFlag() & (MergeMessageFlag.IS_HEAD | MergeMessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
- if ((inputValue.getFlag() & MergeMessageFlag.FROM_PREDECESSOR) > 0) {
+ outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
+ if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
prevNode.set(inputValue.getNode());
sawPrevNode = true;
- } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_SUCCESSOR) > 0) {
+ } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
nextNode.set(inputValue.getNode());
sawNextNode = true;
- } else if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
+ } else if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
curNode.set(inputValue.getNode());
sawCurNode = true;
} else {
@@ -319,7 +316,7 @@
}
outputValue.set(outFlag, curNode);
- if ((outFlag & MergeMessageFlag.IS_HEAD) > 0 && (outFlag & MergeMessageFlag.IS_TAIL) > 0) {
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// True heads meeting tails => merge is complete for this node
completeCollector.collect(key, outputValue);
} else {
@@ -349,19 +346,19 @@
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
conf.setMapperClass(MergePathsH4Mapper.class);
conf.setReducerClass(MergePathsH4Reducer.class);
MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
+ PositionWritable.class, NodeWithFlagWritable.class);
MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
+ PositionWritable.class, NodeWithFlagWritable.class);
MultipleOutputs.addNamedOutput(conf, MergePathsH4Reducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
+ PositionWritable.class, NodeWithFlagWritable.class);
FileSystem dfs = FileSystem.get(conf);
// clean output dirs
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
index 5d4b886..4009fc6 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -26,7 +26,7 @@
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -37,22 +37,22 @@
* Mapper class: removes any tips by not mapping them at all
*/
private static class RemoveTipsMapper extends MapReduceBase implements
- Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
private int removeTipsMinLength;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
public void configure(JobConf conf) {
removeTipsMinLength = conf.getInt("removeTipsMinLength", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
}
@Override
- public void map(PositionWritable key, MessageWritableNodeWithFlag value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
curNode.set(value.getNode());
if ((curNode.inDegree() == 0 || curNode.outDegree() == 0)
@@ -60,7 +60,7 @@
// kill this node by NOT mapping it. Update my neighbors with a suicide note
//TODO: update neighbors by removing me from its list
} else {
- outputValue.set(MergeMessageFlag.FROM_SELF, curNode);
+ outputValue.set(MergeMessageFlag.MSG_SELF, curNode);
output.collect(key, value);
}
}
@@ -70,11 +70,11 @@
* Reducer class: keeps mapped nodes
*/
private static class MergePathsH4Reducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private NodeWritable prevNode;
private NodeWritable nextNode;
@@ -86,20 +86,20 @@
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
prevNode = new NodeWritable(KMER_SIZE);
nextNode = new NodeWritable(KMER_SIZE);
}
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
inputValue.set(values.next());
if (!values.hasNext()) {
- if ((inputValue.getFlag() & MergeMessageFlag.FROM_SELF) > 0) {
+ if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) > 0) {
// FROM_SELF => keep self
output.collect(key, inputValue);
} else {
@@ -126,9 +126,9 @@
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
conf.setMapperClass(MergePathsH4Mapper.class);
conf.setReducerClass(MergePathsH4Reducer.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
deleted file mode 100644
index f05797e..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package edu.uci.ics.genomix.hadoop.pmcommon;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-/*
- * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
- * This class is used as the value in several MapReduce algorithms.
- */
-public class MessageWritableNodeWithFlag extends BinaryComparable implements WritableComparable<BinaryComparable> {
- private byte flag;
- private NodeWritable node;
-
- public MessageWritableNodeWithFlag() {
- this(0);
- }
-
- public MessageWritableNodeWithFlag(int k) {
- this.flag = 0;
- this.node = new NodeWritable(k);
- }
-
- public MessageWritableNodeWithFlag(byte flag, int kmerSize) {
- this.flag = flag;
- this.node = new NodeWritable(kmerSize);
- }
-
- public MessageWritableNodeWithFlag(byte flag, NodeWritable node) {
- this(node.getKmer().getKmerLength());
- set(flag, node);
- }
-
- public void set(MessageWritableNodeWithFlag right) {
- set(right.getFlag(), right.getNode());
- }
-
- public void set(byte flag, NodeWritable node) {
- this.node.set(node);
- this.flag = flag;
- }
-
- @Override
- public void readFields(DataInput arg0) throws IOException {
- node.readFields(arg0);
- flag = arg0.readByte();
- }
-
- @Override
- public void write(DataOutput arg0) throws IOException {
- node.write(arg0);
- arg0.writeByte(flag);
- }
-
- public NodeWritable getNode() {
- if (node.getCount() != 0) {
- return node;
- }
- return null;
- }
-
- public byte getFlag() {
- return this.flag;
- }
-
- public String toString() {
- return node.toString() + '\t' + String.valueOf(flag);
- }
-
- @Override
- public byte[] getBytes() {
- if (node.getCount() != 0) {
- return node.getKmer().getBytes();
- } else
- return null;
- }
-
- @Override
- public int getLength() {
- return node.getCount();
- }
-
- @Override
- public int hashCode() {
-// return super.hashCode() + flag + node.hashCode();
- return flag + node.hashCode();
- }
-
- @Override
- public boolean equals(Object rightObj) {
- if (rightObj instanceof MessageWritableNodeWithFlag) {
- MessageWritableNodeWithFlag rightMessage = (MessageWritableNodeWithFlag) rightObj;
- return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
- }
- return false;
- }
-}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
new file mode 100644
index 0000000..f929151
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -0,0 +1,186 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import javax.management.RuntimeErrorException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+/*
+ * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
+ * This class is used as the value in several MapReduce algorithms.
+ */
+public class NodeWithFlagWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
+ private byte flag;
+ private NodeWritable node;
+
+ public static class MessageFlag {
+ public static final byte EMPTY_MESSAGE = 0;
+ // message types
+ public static final byte MSG_SELF = 0b01 << 0;
+ public static final byte MSG_UPDATE_MERGE = 0b10 << 0;
+ public static final byte MSG_UPDATE_EDGE = 0b11 << 0;
+ public static final byte MSG_MASK = 0b11 << 0;
+ // merge/update directions
+ public static final byte DIR_FF = 0b00 << 2;
+ public static final byte DIR_FR = 0b01 << 2;
+ public static final byte DIR_RF = 0b10 << 2;
+ public static final byte DIR_RR = 0b11 << 2;
+ public static final byte DIR_MASK = 0b11 << 2;
+ // additional info
+ public static final byte IS_HEAD = 0b1 << 4;
+ public static final byte IS_TAIL = 0b1 << 5;
+ // extra bit used differently in each operation
+ public static final byte EXTRA_FLAG = 1 << 6;
+ }
+
+ /*
+ * Process any changes to @node contained in @updateMsg. This includes merges and edge updates
+ */
+ public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
+ byte updateFlag = updateMsg.getFlag();
+ NodeWritable updateNode = updateMsg.getNode();
+ if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
+ // this message wants to update the edges of node.
+ // remove position and merge its position lists with node
+ if (!updateNode.equals(NodeWritable.EMPTY_NODE)) {
+ // need to remove updateNode from the specified PositionList
+ switch(updateFlag & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ node.getFFList().remove(updateNode.getNodeID());
+ break;
+ case MessageFlag.DIR_FR:
+ node.getFRList().remove(updateNode.getNodeID());
+ break;
+ case MessageFlag.DIR_RF:
+ node.getRFList().remove(updateNode.getNodeID());
+ break;
+ case MessageFlag.DIR_RR:
+ node.getRRList().remove(updateNode.getNodeID());
+ break;
+ default:
+ throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+ }
+ }
+ // now merge positionlists from update and node
+ node.getFFList().appendList(updateNode.getFFList());
+ node.getFRList().appendList(updateNode.getFRList());
+ node.getRFList().appendList(updateNode.getRFList());
+ node.getRRList().appendList(updateNode.getRRList());
+ } else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
+ // this message wants to merge node with updateNode.
+ // the direction flag indicates how the merge should take place.
+ switch(updateFlag & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+ node.getFFList().set(updateNode.getFFList());
+ break;
+ case MessageFlag.DIR_FR:
+ // FIXME not sure if this should be reverse-complement or just reverse...
+ node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+ node.getFRList().set(updateNode.getFRList());
+ break;
+ case MessageFlag.DIR_RF:
+
+ break;
+ case MessageFlag.DIR_RR:
+ node.getKmer().mergeWithRRKmer(kmerSize, updateNode.getKmer());
+ node.getRRList().set(updateNode.getRRList());
+ break;
+ default:
+ throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+ }
+ }
+ }
+
+ public NodeWithFlagWritable() {
+ this(0);
+ }
+
+ public NodeWithFlagWritable(int k) {
+ this.flag = 0;
+ this.node = new NodeWritable(k);
+ }
+
+ public NodeWithFlagWritable(byte flag, int kmerSize) {
+ this.flag = flag;
+ this.node = new NodeWritable(kmerSize);
+ }
+
+ public NodeWithFlagWritable(byte flag, NodeWritable node) {
+ this(node.getKmer().getKmerLength());
+ set(flag, node);
+ }
+
+ public void set(NodeWithFlagWritable right) {
+ set(right.getFlag(), right.getNode());
+ }
+
+ public void set(byte flag, NodeWritable node) {
+ this.node.set(node);
+ this.flag = flag;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ node.readFields(arg0);
+ flag = arg0.readByte();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ node.write(arg0);
+ arg0.writeByte(flag);
+ }
+
+ public NodeWritable getNode() {
+ if (node.getCount() != 0) {
+ return node;
+ }
+ return null;
+ }
+
+ public byte getFlag() {
+ return this.flag;
+ }
+
+ public String toString() {
+ return node.toString() + '\t' + String.valueOf(flag);
+ }
+
+ @Override
+ public byte[] getBytes() {
+ if (node.getCount() != 0) {
+ return node.getKmer().getBytes();
+ } else
+ return null;
+ }
+
+ @Override
+ public int getLength() {
+ return node.getCount();
+ }
+
+ @Override
+ public int hashCode() {
+// return super.hashCode() + flag + node.hashCode();
+ return flag + node.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object rightObj) {
+ if (rightObj instanceof NodeWithFlagWritable) {
+ NodeWithFlagWritable rightMessage = (NodeWithFlagWritable) rightObj;
+ return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 27058ed..0523812 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -58,17 +59,10 @@
public static final String TO_MERGE_OUTPUT = "toMerge";
public static final String TO_UPDATE_OUTPUT = "toUpdate";
- public static class PathNodeFlag {
- public static final byte EMPTY_MESSAGE = 0;
- public static final byte FROM_SELF = 1 << 0;
- public static final byte IS_HEAD = 1 << 1;
- public static final byte IS_TAIL = 1 << 2;
- public static final byte IS_COMPLETE = 1 << 3;
- public static final byte NEAR_PATH = 1 << 4;
- }
+ private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
- private static void sendOutputToNextNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+ private static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
while (posIterator.hasNext()) {
collector.collect(posIterator.next(), outputValue);
@@ -79,8 +73,8 @@
}
}
- private static void sendOutputToPreviousNeighbors(NodeWritable node, MessageWritableNodeWithFlag outputValue,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> collector) throws IOException {
+ private static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
while (posIterator.hasNext()) {
collector.collect(posIterator.next(), outputValue);
@@ -92,18 +86,18 @@
}
public static class PathNodeInitialMapper extends MapReduceBase implements
- Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
+ Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
private PositionWritable outputKey;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
private int inDegree;
private int outDegree;
private boolean pathNode;
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
outputKey = new PositionWritable();
}
@@ -115,8 +109,7 @@
*/
@Override
public void map(NodeWritable key, NullWritable value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
- throws IOException {
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
inDegree = key.inDegree();
outDegree = key.outDegree();
if (inDegree == 1 && outDegree == 1) {
@@ -124,38 +117,38 @@
} else if (inDegree == 0 && outDegree == 1) {
pathNode = true;
// start of a tip. needs to merge & be marked as head
- outputValue.set(PathNodeFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+ outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
output.collect(key.getNodeID(), outputValue);
} else if (inDegree == 1 && outDegree == 0) {
pathNode = true;
// end of a tip. needs to merge & be marked as tail
- outputValue.set(PathNodeFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+ outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
output.collect(key.getNodeID(), outputValue);
} else {
pathNode = false;
if (outDegree > 0) {
// Not a path myself, but my successor might be one. Map forward successor to find heads
- outputValue.set(PathNodeFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+ outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
sendOutputToNextNeighbors(key, outputValue, output);
}
if (inDegree > 0) {
// Not a path myself, but my predecessor might be one. map predecessor to find tails
- outputValue.set(PathNodeFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+ outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
sendOutputToPreviousNeighbors(key, outputValue, output);
}
- // this non-path node won't participate in the merge
- outputValue.set((byte) (PathNodeFlag.FROM_SELF | PathNodeFlag.IS_COMPLETE), key);
+ // this non-path node won't participate in the merge. Mark as "complete" (H + T)
+ outputValue.set((byte) (MessageFlag.MSG_SELF | MessageFlag.IS_HEAD | MessageFlag.IS_TAIL), key);
output.collect(key.getNodeID(), outputValue);
}
if (pathNode) {
// simple path nodes map themselves
- outputValue.set(PathNodeFlag.FROM_SELF, key);
+ outputValue.set(MessageFlag.MSG_SELF, key);
output.collect(key.getNodeID(), outputValue);
reporter.incrCounter("genomix", "path_nodes", 1);
// also mark neighbors of paths (they are candidates for updates)
- outputValue.set(PathNodeFlag.NEAR_PATH, NodeWritable.EMPTY_NODE);
+ outputValue.set(NEAR_PATH, NodeWritable.EMPTY_NODE);
sendOutputToNextNeighbors(key, outputValue, output);
sendOutputToPreviousNeighbors(key, outputValue, output);
}
@@ -163,24 +156,24 @@
}
public static class PathNodeInitialReducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
- private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toMergeCollector;
- private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> completeCollector;
- private OutputCollector<PositionWritable, MessageWritableNodeWithFlag> toUpdateCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable nodeToKeep;
private byte outputFlag;
private byte inputFlag;
+ private boolean sawSelf;
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ inputValue = new NodeWithFlagWritable(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
nodeToKeep = new NodeWritable(KMER_SIZE);
}
@@ -195,54 +188,54 @@
*/
@SuppressWarnings("unchecked")
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
throws IOException {
completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
- toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
- outputFlag = PathNodeFlag.EMPTY_MESSAGE;
+ outputFlag = MessageFlag.EMPTY_MESSAGE;
+ sawSelf = false;
while (values.hasNext()) {
inputValue.set(values.next());
inputFlag = inputValue.getFlag();
outputFlag |= inputFlag;
- if ((inputFlag & PathNodeFlag.FROM_SELF) > 0) {
+ if ((inputFlag & MessageFlag.MSG_SELF) > 0) {
// SELF -> keep this node
+ if (sawSelf) {
+ throw new IOException("Already saw SELF node in PathNodeInitialReducer! previous self: "
+ + nodeToKeep.toString() + ". current self: " + inputValue.getNode().toString());
+ }
+ sawSelf = true;
nodeToKeep.set(inputValue.getNode());
}
}
- if ((outputFlag & PathNodeFlag.FROM_SELF) > 0) {
- if ((outputFlag & PathNodeFlag.IS_COMPLETE) > 0) {
- if ((outputFlag & PathNodeFlag.NEAR_PATH) > 0) {
- // non-path, but update candidate
- outputValue.set(PathNodeFlag.NEAR_PATH, nodeToKeep);
+ if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
+ if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
+ // non-path or single path nodes
+ if ((outputFlag & NEAR_PATH) > 0) {
+ // non-path, but an update candidate
+ outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
toUpdateCollector.collect(key, outputValue);
} else {
- // non-path node. Store in "complete" output
- outputValue.set(PathNodeFlag.EMPTY_MESSAGE, nodeToKeep);
+ // non-path or single-node path. Store in "complete" output
+ outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
completeCollector.collect(key, outputValue);
}
} else {
- if ((outputFlag & PathNodeFlag.IS_HEAD) > 0 && (outputFlag & PathNodeFlag.IS_TAIL) > 0) {
- // path nodes marked as H & T are single-node paths (not mergeable, not updateable)
- outputValue.set(PathNodeFlag.EMPTY_MESSAGE, nodeToKeep);
- completeCollector.collect(key, outputValue);
- } else {
- // path nodes that are mergeable
- outputFlag &= (PathNodeFlag.IS_HEAD | PathNodeFlag.IS_TAIL); // clear flags except H/T
- outputValue.set(outputFlag, nodeToKeep);
- toMergeCollector.collect(key, outputValue);
+ // path nodes that are mergeable
+ outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
+ outputValue.set(outputFlag, nodeToKeep);
+ toMergeCollector.collect(key, outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
- if ((outputFlag & PathNodeFlag.IS_HEAD) > 0) {
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
- }
- if ((outputFlag & PathNodeFlag.IS_TAIL) > 0) {
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
- }
+ reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
}
}
} else {
@@ -265,35 +258,34 @@
conf.setJobName("PathNodeInitial " + inputPath);
FileInputFormat.addInputPaths(conf, inputPath);
- Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
+ // Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
+ Path outputPath = new Path(toMergeOutput);
FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
conf.setMapperClass(PathNodeInitialMapper.class);
conf.setReducerClass(PathNodeInitialReducer.class);
- MultipleOutputs.addNamedOutput(conf, TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
+ PositionWritable.class, NodeWithFlagWritable.class);
MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, MessageWritableNodeWithFlag.class);
+ PositionWritable.class, NodeWithFlagWritable.class);
FileSystem dfs = FileSystem.get(conf);
dfs.delete(outputPath, true); // clean output dir
RunningJob job = JobClient.runJob(conf);
// move the tmp outputs to the arg-spec'ed dirs
- dfs.rename(new Path(outputPath + File.separator + TO_MERGE_OUTPUT), new Path(toMergeOutput));
dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+ // dfs.rename(outputPath, new Path(toMergeOutput));
return job;
}
@@ -305,7 +297,7 @@
}
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+ int res = new PathNodeInitial().run(args);
System.exit(res);
}
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
index 8f0be6c..f24fdc1 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
@@ -14,7 +14,7 @@
import org.junit.Test;
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
@@ -41,9 +41,9 @@
public void testNoNeighbors() throws IOException {
NodeWritable noNeighborNode = new NodeWritable(posn1, new PositionListWritable(), new PositionListWritable(),
new PositionListWritable(), new PositionListWritable(), kmer);
- MessageWritableNodeWithFlag output = new MessageWritableNodeWithFlag((byte) (MergeMessageFlag.FROM_SELF | MergeMessageFlag.IS_COMPLETE), noNeighborNode);
+ NodeWithFlagWritable output = new NodeWithFlagWritable((byte) (MergeMessageFlag.MSG_SELF | MergeMessageFlag.IS_COMPLETE), noNeighborNode);
// test mapper
- new MapDriver<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag>()
+ new MapDriver<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable>()
.withMapper(new PathNodeInitial.PathNodeInitialMapper())
.withConfiguration(conf)
.withInput(noNeighborNode, NullWritable.get())
@@ -51,7 +51,7 @@
.runTest();
// test reducer
// MultipleOutputs.addNamedOutput(conf, "complete", SequenceFileOutputFormat.class, PositionWritable.class, MessageWritableNodeWithFlag.class);
- new ReduceDriver<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag>()
+ new ReduceDriver<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable>()
.withReducer(new PathNodeInitial.PathNodeInitialReducer())
.withConfiguration(conf)
.withInput(posn1, Arrays.asList(output))