H4 complete; fix probBeingHead
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 88cfb79..c11b6f0 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
@@ -124,6 +125,11 @@
protected boolean isNodeRandomHead(PositionWritable nodeID) {
// "deterministically random", based on node id
randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+
+ // similar hashcodes will produce similar initial random values. Burn through a few to increase spread
+ for (int i = 0; i < 100; i++) {
+ randGenerator.nextFloat();
+ }
return randGenerator.nextFloat() < probBeingRandomHead;
}
@@ -198,6 +204,7 @@
} else {
// I'm a tail
if (hasNext && hasPrev) {
+ // TODO change to local maximum
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
@@ -287,10 +294,8 @@
private int KMER_SIZE;
private NodeWithFlagWritable inputValue;
private NodeWithFlagWritable outputValue;
- private NodeWritable curNode;
private PositionWritable outPosn;
private boolean sawCurNode;
- private byte outFlag;
private byte inFlag;
// to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
@@ -302,19 +307,26 @@
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new NodeWithFlagWritable(KMER_SIZE);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
- curNode = new NodeWritable(KMER_SIZE);
outPosn = new PositionWritable();
updateMsgs = new ArrayList<NodeWithFlagWritable>();
updateMsgsSize = updateMsgs.size();
}
+ private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
+ updateMsgsCount++;
+ if (updateMsgsCount >= updateMsgsSize) {
+ updateMsgs.add(new NodeWithFlagWritable(myInputValue)); // make a copy of inputValue-- not a reference!
+ } else {
+ updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
+ }
+ }
+
/*
* Process updates from mapper
*
* (non-Javadoc)
* @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
- @SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
@@ -326,22 +338,21 @@
inputValue.set(values.next());
inFlag = inputValue.getFlag();
inMsg = (byte) (inFlag & MessageFlag.MSG_MASK);
-
+
switch (inMsg) {
case MessageFlag.MSG_UPDATE_MERGE:
case MessageFlag.MSG_SELF:
if (sawCurNode)
- throw new IOException("Saw more than one MSG_SELF! previously seen self: " + curNode
- + " current self: " + inputValue.getNode());
- curNode.set(inputValue.getNode());
- outFlag = inFlag;
- sawCurNode = true;
+ throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+ + outputValue.getNode() + " current self: " + inputValue.getNode());
if (inMsg == MessageFlag.MSG_SELF) {
- outPosn.set(curNode.getNodeID());
+ outPosn.set(outputValue.getNode().getNodeID());
} else { // MSG_UPDATE_MERGE
// merge messages are sent to their merge recipient
- outPosn.set(curNode.getListFromDir(inMsg).getPosition(0));
+ outPosn.set(outputValue.getNode().getListFromDir(inMsg).getPosition(0));
}
+ outputValue.set(inFlag, inputValue.getNode());
+ sawCurNode = true;
break;
case MessageFlag.MSG_UPDATE_EDGE:
addUpdateMessage(inputValue);
@@ -355,196 +366,103 @@
}
// process all the update messages for this node
- // I have no idea how to make this more efficient...
- for (int i=0; i < updateMsgsCount; i++) {
- NodeWithFlagWritable.processUpdates(curNode, updateMsgs.get(i), KMER_SIZE);
+ for (int i = 0; i < updateMsgsCount; i++) {
+ outputValue.processUpdates(updateMsgs.get(i), KMER_SIZE);
}
- outputValue.set(outFlag, curNode);
output.collect(outPosn, outputValue);
}
-
- private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
- updateMsgsCount++;
- if (updateMsgsCount >= updateMsgsSize) {
- updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
- } else {
- updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
- }
- }
}
/*
- * Mapper class: sends the update messages to their (already decided) destination
+ * Reducer class: processes merge messages
*/
- public static class H4MergeMapper extends MapReduceBase implements
- Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
- private static long randSeed;
- private Random randGenerator;
- private float probBeingRandomHead;
-
- private int KMER_SIZE;
- private NodeWithFlagWritable outputValue;
- private NodeWithFlagWritable mergeMsgValue;
- private NodeWithFlagWritable updateMsgValue;
-
- private NodeWritable curNode;
- private PositionWritable curID;
- private PositionWritable nextID;
- private PositionWritable prevID;
- private boolean hasNext;
- private boolean hasPrev;
- private boolean curHead;
- private boolean nextHead;
- private boolean prevHead;
- private MergeDir mergeDir;
- private byte inFlag;
- private byte headFlag;
- private byte tailFlag;
- private byte mergeMsgFlag;
- private byte nextDir;
- private byte prevDir;
-
- public void configure(JobConf conf) {
-
- randSeed = conf.getLong("randomSeed", 0);
- randGenerator = new Random(randSeed);
- probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
-
- KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new NodeWithFlagWritable(KMER_SIZE);
-
- mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
- updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
-
- curNode = new NodeWritable(KMER_SIZE);
- curID = new PositionWritable();
- nextID = new PositionWritable();
- prevID = new PositionWritable();
- }
-
- @Override
- public void map(PositionWritable key, NodeWithFlagWritable value,
- OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
- inFlag = value.getFlag();
- curNode.set(value.getNode());
- curID.set(curNode.getNodeID());
-
- }
-
- }
-
- /*
- * Reducer class: processes the update messages from updateMapper
- */
- private static class H4MergeReducer2 extends MapReduceBase implements
+ private static class H4MergeReducer extends MapReduceBase implements
Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
- private static final String TO_MERGE_OUTPUT = "toMerge";
- private static final String COMPLETE_OUTPUT = "complete";
- private static final String UPDATES_OUTPUT = "update";
- private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
+ public static final String COMPLETE_OUTPUT = "complete";
private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
- private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
private int KMER_SIZE;
private NodeWithFlagWritable inputValue;
private NodeWithFlagWritable outputValue;
- private NodeWritable curNode;
- private NodeWritable prevNode;
- private NodeWritable nextNode;
+ private PositionWritable outputKey;
private boolean sawCurNode;
- private boolean sawPrevNode;
- private boolean sawNextNode;
- private int count;
- private byte outFlag;
+ private byte inFlag;
+
+ // to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
+ private ArrayList<NodeWithFlagWritable> mergeMsgs;
+ private int updateMsgsSize;
+ private int mergeMsgsCount;
public void configure(JobConf conf) {
- mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new NodeWithFlagWritable(KMER_SIZE);
outputValue = new NodeWithFlagWritable(KMER_SIZE);
- curNode = new NodeWritable(KMER_SIZE);
- prevNode = new NodeWritable(KMER_SIZE);
- nextNode = new NodeWritable(KMER_SIZE);
+ outputKey = new PositionWritable();
+ mergeMsgs = new ArrayList<NodeWithFlagWritable>();
+ updateMsgsSize = mergeMsgs.size();
}
+ private void addMergeMessage(NodeWithFlagWritable myInputValue) {
+ mergeMsgsCount++;
+ if (mergeMsgsCount >= updateMsgsSize) {
+ mergeMsgs.add(new NodeWithFlagWritable(myInputValue)); // make a copy of inputValue-- not a reference!
+ } else {
+ mergeMsgs.get(mergeMsgsCount - 1).set(myInputValue); // update existing reference
+ }
+ }
+
+ /*
+ * Process merges
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
@SuppressWarnings("unchecked")
@Override
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
- OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
- toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+ OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
+ throws IOException {
completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
- updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
+ sawCurNode = false;
+ mergeMsgsCount = 0;
- inputValue.set(values.next());
- if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
- if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0
- && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
- // complete path (H & T meet in this node)
- completeCollector.collect(key, inputValue);
- } else {
- // FROM_SELF => no merging this round. remap self
- toMergeCollector.collect(key, inputValue);
- }
- } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
- // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
- throw new IOException("Only one value recieved in merge, but it wasn't from self!");
- }
- } else {
- // multiple inputs => a merge will take place. Aggregate all, then collect the merged path
- count = 0;
- outFlag = MessageFlag.EMPTY_MESSAGE;
- sawCurNode = false;
- sawPrevNode = false;
- sawNextNode = false;
- while (true) { // process values; break when no more
- count++;
- outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL)); // merged node may become HEAD or TAIL
- if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
- prevNode.set(inputValue.getNode());
- sawPrevNode = true;
- } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
- nextNode.set(inputValue.getNode());
- sawNextNode = true;
- } else if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
- curNode.set(inputValue.getNode());
+ while (values.hasNext()) {
+ inputValue.set(values.next());
+ inFlag = inputValue.getFlag();
+ switch (inFlag & MessageFlag.MSG_MASK) {
+ case MessageFlag.MSG_SELF:
+ if (sawCurNode)
+ throw new IOException("Saw more than one MSG_SELF! previously seen self: "
+ + outputValue.getNode() + " current self: " + inputValue.getNode());
+ outputKey.set(outputValue.getNode().getNodeID());
+ outputValue.set(inFlag, inputValue.getNode());
sawCurNode = true;
- } else {
- throw new IOException("Unknown origin for merging node");
- }
- if (!values.hasNext()) {
break;
- } else {
- inputValue.set(values.next());
- }
+ case MessageFlag.MSG_UPDATE_MERGE:
+ addMergeMessage(inputValue);
+ break;
+ case MessageFlag.MSG_UPDATE_EDGE:
+ throw new IOException("Error: update message recieved during merge phase!" + inputValue);
+ default:
+ throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
}
- if (count != 2 && count != 3) {
- throw new IOException("Expected two or three nodes in MergePathsH4 reduce; saw "
- + String.valueOf(count));
- }
- if (!sawCurNode) {
- throw new IOException("Didn't see node from self in MergePathsH4 reduce!");
- }
+ }
+ if (!sawCurNode) {
+ throw new IOException("Never saw self in recieve update messages!");
+ }
- // merge any received nodes
- if (sawNextNode) {
- curNode.mergeForwardNext(nextNode, KMER_SIZE);
- reporter.incrCounter("genomix", "num_merged", 1);
- }
- if (sawPrevNode) {
- // TODO: fix this merge command! which one is the right one?
- curNode.mergeForwardPre(prevNode, KMER_SIZE);
- reporter.incrCounter("genomix", "num_merged", 1);
- }
+ // process all the merge messages for this node
+ for (int i = 0; i < mergeMsgsCount; i++) {
+ outputValue.processUpdates(mergeMsgs.get(i), KMER_SIZE);
+ }
- outputValue.set(outFlag, curNode);
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
- // True heads meeting tails => merge is complete for this node
- completeCollector.collect(key, outputValue);
- } else {
- toMergeCollector.collect(key, outputValue);
- }
+ // H + T indicates a complete path
+ if ((outputValue.getFlag() & MessageFlag.IS_HEAD) > 0
+ && ((outputValue.getFlag() & MessageFlag.IS_TAIL) > 0)) {
+ completeCollector.collect(outputKey, outputValue);
+ } else {
+ toMergeCollector.collect(outputKey, outputValue);
}
}
@@ -556,55 +474,45 @@
/*
* Run one iteration of the mergePaths algorithm
*/
- public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput,
- JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
+ FileSystem dfs = FileSystem.get(conf);
conf.setJarByClass(MergePathsH4.class);
conf.setJobName("MergePathsH4 " + inputPath);
-
- FileInputFormat.addInputPaths(conf, inputPath);
- Path outputPath = new Path(inputPath + ".h4merge.tmp");
- FileOutputFormat.setOutputPath(conf, outputPath);
-
conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(NullOutputFormat.class);
-
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
conf.setOutputValueClass(NodeWithFlagWritable.class);
-
+
+ // step 1: decide merge dir and send updates
+ FileInputFormat.addInputPaths(conf, inputPath);
+ String outputUpdatesTmp = "h4.updatesProcessed." + new Random().nextDouble() + ".tmp"; // random filename
+ FileOutputFormat.setOutputPath(conf, new Path(outputUpdatesTmp));
+ dfs.delete(new Path(outputUpdatesTmp), true);
conf.setMapperClass(H4UpdatesMapper.class);
conf.setReducerClass(H4UpdatesReducer.class);
-
- MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
- MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
- MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
- PositionWritable.class, NodeWithFlagWritable.class);
-
- FileSystem dfs = FileSystem.get(conf);
- // clean output dirs
- dfs.delete(outputPath, true);
- dfs.delete(new Path(toMergeOutput), true);
- dfs.delete(new Path(completeOutput), true);
- dfs.delete(new Path(updatesOutput), true);
-
RunningJob job = JobClient.runJob(conf);
+ // step 2: process merges
+ FileInputFormat.addInputPaths(conf, outputUpdatesTmp);
+ Path outputMergeTmp = new Path("h4.mergeProcessed." + new Random().nextDouble() + ".tmp"); // random filename
+ FileOutputFormat.setOutputPath(conf, outputMergeTmp);
+ MultipleOutputs.addNamedOutput(conf, H4MergeReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, NodeWithFlagWritable.class);
+ dfs.delete(outputMergeTmp, true);
+ dfs.delete(new Path(outputMergeTmp + "/" + H4MergeReducer.COMPLETE_OUTPUT), true);
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(H4MergeReducer.class);
+ job = JobClient.runJob(conf);
+
// move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
- if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.TO_MERGE_OUTPUT), new Path(
- toMergeOutput))) {
- dfs.mkdirs(new Path(toMergeOutput));
- }
- if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.COMPLETE_OUTPUT), new Path(
- completeOutput))) {
+ if (!dfs.rename(new Path(outputMergeTmp + File.separator + H4MergeReducer.COMPLETE_OUTPUT), new Path(completeOutput))) {
dfs.mkdirs(new Path(completeOutput));
}
- if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.UPDATES_OUTPUT),
- new Path(updatesOutput))) {
- dfs.mkdirs(new Path(updatesOutput));
+ if (!dfs.rename(outputMergeTmp, new Path(toMergeOutput))) {
+ dfs.mkdirs(new Path(completeOutput));
}
return job;
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 6729c78..75d4889 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -43,7 +43,7 @@
byte neighborToMeDir = mirrorDirection(neighborDir);
byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
- // clear previous kmer and edge data
+ // clear previous kmer and edge data
node.reset(0);
// indicate the node to delete
@@ -73,8 +73,9 @@
}
/*
- * When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge,
- * returns the new edge type for A->C
+ * When A->B edge type is @neighborDir and B will merge towards C along a
+ *
+ * @mergeDir edge, returns the new edge type for A->C
*/
public static byte flipDirection(byte neighborDir, byte mergeDir) {
switch (mergeDir) {
@@ -106,10 +107,10 @@
}
/*
- * Process any changes to @node contained in @updateMsg. This includes merges and edge updates
+ * Process any changes to @self contained in @updateMsg. This includes
+ * merges and edge updates.
*/
- public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize)
- throws IOException {
+ public void processUpdates(NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
byte updateFlag = updateMsg.getFlag();
NodeWritable updateNode = updateMsg.getNode();
if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
@@ -130,7 +131,25 @@
node.getListFromDir(updateFlag).remove(updateNode.getNodeID()); // remove the node from my edges
node.getKmer().mergeWithKmerInDir(updateFlag, kmerSize, updateNode.getKmer()); // merge with its kmer
- // merge my edges with the incoming node's edges, accounting for if the node flipped in
+ // pass along H/T information from the merging node. flipping H ->T, T -> H
+ switch (updateFlag & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_RR:
+ flag |= (byte) (updateFlag & MessageFlag.IS_HEAD);
+ flag |= (byte) (updateFlag & MessageFlag.IS_TAIL);
+ break;
+ case MessageFlag.DIR_FR:
+ case MessageFlag.DIR_RF:
+ if ((updateFlag & MessageFlag.IS_HEAD) > 0)
+ flag |= (byte) (updateFlag & MessageFlag.IS_TAIL);
+ if ((updateFlag & MessageFlag.IS_TAIL) > 0)
+ flag |= (byte) (updateFlag & MessageFlag.IS_HEAD);
+ break;
+ default:
+ throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+ }
+
+ // merge my edges with the incoming node's edges, accounting for if the node flipped in
// the merge and if it's my predecessor or successor
switch (updateFlag & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
@@ -139,27 +158,31 @@
node.getFRList().set(updateNode.getFRList());
// update isn't allowed to have any other successors (mirror & flip)
if (updateNode.getRFList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_FR:
// flip edges
node.getFFList().set(updateNode.getRFList());
node.getFRList().set(updateNode.getRRList());
if (updateNode.getFFList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_RF:
// flip edges
node.getRFList().set(updateNode.getFFList());
node.getRRList().set(updateNode.getFRList());
if (updateNode.getRRList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
case MessageFlag.DIR_RR:
node.getRFList().set(updateNode.getRFList());
node.getRRList().set(updateNode.getRRList());
if (updateNode.getFRList().getCountOfPosition() > 0)
- throw new IOException("Invalid merge detected! Node: " + node + " merging towards " + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
+ throw new IOException("Invalid merge detected! Node: " + node + " merging towards "
+ + updateNode + "along" + (updateFlag & MessageFlag.DIR_MASK));
break;
default:
throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
@@ -245,7 +268,7 @@
@Override
public int hashCode() {
- // return super.hashCode() + flag + node.hashCode();
+ // return super.hashCode() + flag + node.hashCode();
return flag + node.hashCode();
}
@@ -257,4 +280,8 @@
}
return false;
}
+
+ public void setNode(NodeWritable otherNode) {
+ node.set(otherNode);
+ }
}
\ No newline at end of file