PathNodeInitial updateable optimization
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index cb3466b..27058ed 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -39,7 +39,6 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -58,7 +57,7 @@
public static final String COMPLETE_OUTPUT = "complete";
public static final String TO_MERGE_OUTPUT = "toMerge";
public static final String TO_UPDATE_OUTPUT = "toUpdate";
-
+
public static class PathNodeFlag {
public static final byte EMPTY_MESSAGE = 0;
public static final byte FROM_SELF = 1 << 0;
@@ -125,38 +124,38 @@
} else if (inDegree == 0 && outDegree == 1) {
pathNode = true;
// start of a tip. needs to merge & be marked as head
- outputValue.set(MergeMessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+ outputValue.set(PathNodeFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
output.collect(key.getNodeID(), outputValue);
} else if (inDegree == 1 && outDegree == 0) {
pathNode = true;
// end of a tip. needs to merge & be marked as tail
- outputValue.set(MergeMessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+ outputValue.set(PathNodeFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
output.collect(key.getNodeID(), outputValue);
} else {
pathNode = false;
if (outDegree > 0) {
// Not a path myself, but my successor might be one. Map forward successor to find heads
- outputValue.set(MergeMessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+ outputValue.set(PathNodeFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
sendOutputToNextNeighbors(key, outputValue, output);
}
if (inDegree > 0) {
// Not a path myself, but my predecessor might be one. map predecessor to find tails
- outputValue.set(MergeMessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+ outputValue.set(PathNodeFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
sendOutputToPreviousNeighbors(key, outputValue, output);
}
// this non-path node won't participate in the merge
- outputValue.set((byte) (MergeMessageFlag.FROM_SELF | MergeMessageFlag.IS_COMPLETE), key);
+ outputValue.set((byte) (PathNodeFlag.FROM_SELF | PathNodeFlag.IS_COMPLETE), key);
output.collect(key.getNodeID(), outputValue);
}
if (pathNode) {
// simple path nodes map themselves
- outputValue.set(MergeMessageFlag.FROM_SELF, key);
+ outputValue.set(PathNodeFlag.FROM_SELF, key);
output.collect(key.getNodeID(), outputValue);
reporter.incrCounter("genomix", "path_nodes", 1);
// also mark neighbors of paths (they are candidates for updates)
- outputValue.set(MergeMessageFlag.NEAR_PATH, NodeWritable.EMPTY_NODE);
+ outputValue.set(PathNodeFlag.NEAR_PATH, NodeWritable.EMPTY_NODE);
sendOutputToNextNeighbors(key, outputValue, output);
sendOutputToPreviousNeighbors(key, outputValue, output);
}
@@ -203,41 +202,47 @@
toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
- outputFlag = MergeMessageFlag.EMPTY_MESSAGE;
+ outputFlag = PathNodeFlag.EMPTY_MESSAGE;
while (values.hasNext()) {
inputValue.set(values.next());
inputFlag = inputValue.getFlag();
outputFlag |= inputFlag;
- if ((inputFlag & MergeMessageFlag.FROM_SELF) > 0) {
+ if ((inputFlag & PathNodeFlag.FROM_SELF) > 0) {
// SELF -> keep this node
nodeToKeep.set(inputValue.getNode());
}
}
- if ((outputFlag & MergeMessageFlag.FROM_SELF) > 0) {
- if ((outputFlag & MergeMessageFlag.IS_COMPLETE) > 0) {
- if ((outputFlag & MergeMessageFlag.NEAR_PATH) > 0) {
+ if ((outputFlag & PathNodeFlag.FROM_SELF) > 0) {
+ if ((outputFlag & PathNodeFlag.IS_COMPLETE) > 0) {
+ if ((outputFlag & PathNodeFlag.NEAR_PATH) > 0) {
// non-path, but update candidate
- outputValue.set(MergeMessageFlag.NEAR_PATH, nodeToKeep);
+ outputValue.set(PathNodeFlag.NEAR_PATH, nodeToKeep);
toUpdateCollector.collect(key, outputValue);
} else {
// non-path node. Store in "complete" output
- outputValue.set(MergeMessageFlag.EMPTY_MESSAGE, nodeToKeep);
+ outputValue.set(PathNodeFlag.EMPTY_MESSAGE, nodeToKeep);
completeCollector.collect(key, outputValue);
}
} else {
- // path nodes are mergeable
- outputFlag &= (MergeMessageFlag.IS_HEAD | MergeMessageFlag.IS_TAIL); // clear flags besides H/T
- outputValue.set(outputFlag, nodeToKeep);
- toMergeCollector.collect(key, outputValue);
+ if ((outputFlag & PathNodeFlag.IS_HEAD) > 0 && (outputFlag & PathNodeFlag.IS_TAIL) > 0) {
+ // path nodes marked as H & T are single-node paths (not mergeable, not updateable)
+ outputValue.set(PathNodeFlag.EMPTY_MESSAGE, nodeToKeep);
+ completeCollector.collect(key, outputValue);
+ } else {
+ // path nodes that are mergeable
+ outputFlag &= (PathNodeFlag.IS_HEAD | PathNodeFlag.IS_TAIL); // clear flags except H/T
+ outputValue.set(outputFlag, nodeToKeep);
+ toMergeCollector.collect(key, outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
- if ((outputFlag & MergeMessageFlag.IS_HEAD) > 0) {
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
- }
- if ((outputFlag & MergeMessageFlag.IS_TAIL) > 0) {
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((outputFlag & PathNodeFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((outputFlag & PathNodeFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
+ }
}
}
} else {