Merge branch 'genomix/fullstack_genomix' of https://code.google.com/p/hyracks into genomix/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index ef2dc9f..6870ff1 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -31,17 +31,31 @@
// merge/update directions
public static class DirectionFlag {
- public static final byte DIR_FF = 0b00 << 0;
- public static final byte DIR_FR = 0b01 << 0;
- public static final byte DIR_RF = 0b10 << 0;
- public static final byte DIR_RR = 0b11 << 0;
- public static final byte DIR_MASK = 0b11 << 0;
+ public static final byte DIR_FF = 0b000 << 0;
+ public static final byte DIR_FR = 0b001 << 0;
+ public static final byte DIR_RF = 0b010 << 0;
+ public static final byte DIR_RR = 0b011 << 0;
+ public static final byte DIR_NO = 0b111 << 0;
+ public static final byte DIR_MASK = 0b111 << 0;
}
- public static class MergeDirFlag extends DirectionFlag{
- public static final byte SHOULD_MERGEWITHNEXT = 0b0 << 2;
- public static final byte SHOULD_MERGEWITHPREV = 0b1 << 2;
- public static final byte SHOULD_MERGE_MASK = 0b1 << 2;
+ public static class SpecialVertexFlag extends DirectionFlag {
+ public static final byte IS_RANDOMTAIL = 0b00 << 4;
+ public static final byte IS_STOP = 0b00 << 4;
+ public static final byte IS_HEAD = 0b01 << 4;
+ public static final byte IS_FINAL = 0b10 << 4;
+ public static final byte IS_RANDOMHEAD = 0b11 << 4;
+ public static final byte IS_OLDHEAD = 0b11 << 4;
+
+ public static final byte VERTEX_MASK = 0b11 << 4;
+ public static final byte VERTEX_CLEAR = (byte) 11001111;
+ }
+
+ public static class MergeDirFlag extends SpecialVertexFlag{
+ public static final byte NO_MERGE = 0b00 << 2;
+ public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 2;
+ public static final byte SHOULD_MERGEWITHPREV = 0b10 << 2;
+ public static final byte SHOULD_MERGE_MASK = 0b11 << 2;
}
private PositionWritable nodeID;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 8620445..ebcf7f0 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -143,7 +143,7 @@
return;
}
}
- throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+ //throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
}
public void set(PositionListWritable list2) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 2f12606..3d93baa 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -485,6 +485,16 @@
FileSystem dfs = FileSystem.get(conf);
conf.setJarByClass(MergePathsH4.class);
conf.setJobName("MergePathsH4 " + inputPath);
+<<<<<<< HEAD
+
+ //another comment
+
+ FileInputFormat.addInputPaths(conf, inputPath);
+ Path outputPath = new Path(inputPath + ".h4merge.tmp");
+ FileOutputFormat.setOutputPath(conf, outputPath);
+
+=======
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index 98fe5f6..2765920 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -112,6 +112,28 @@
break;
}
}
+<<<<<<< HEAD
+
+ // finally, combine all the completed paths and update messages to
+ // create a single merged graph output
+ dfs.delete(new Path(outputGraphPath), true); // clear any previous
+ // output
+ // use all the "complete" and "update" outputs in addition to the final
+ // (possibly empty) toMerge directories
+ // as input to the final update step. This builds a comma-delim'ed
+ // String of said files.
+ final String lastMergeOutput = mergeOutput;
+ PathFilter updateFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path arg0) {
+ String path = arg0.toString();
+ System.out.println("equals last: " + path + " vs " + lastMergeOutput + " = " + path.endsWith(lastMergeOutput));
+ return (path.matches(".*" + COMPLETE + "_i\\d+$") || path.matches(".*" + UPDATES + "_i\\d+$") || path.endsWith(lastMergeOutput));
+ }
+ };
+ // test comment
+
+=======
if (!mergeComplete) {
// if the merge didn't finish, we have to do one final iteration to convert back into (NodeWritable, NullWritable) pairs
ConvertGraphFromNodeWithFlagToNodeWritable converter = new ConvertGraphFromNodeWithFlagToNodeWritable();
@@ -120,6 +142,7 @@
}
// final output string is a comma-separated list of completeOutputs
+>>>>>>> 0fd527e9535a755b9d0956adb1cdc845f1fc46c2
StringBuilder sb = new StringBuilder();
String delim = "";
for (String output : completeOutputs) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 4dcff2d..3af7069 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -113,6 +113,7 @@
public void processUpdates(NodeWithFlagWritable updateMsg, int kmerSize) throws IOException {
byte updateFlag = updateMsg.getFlag();
NodeWritable updateNode = updateMsg.getNode();
+
if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
// this message wants to update the edges of node.
// remove position and merge its position lists with node
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-0 b/genomix/genomix-pregelix/data/input/pathmerge/part-0
new file mode 100755
index 0000000..56056ab
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-1 b/genomix/genomix-pregelix/data/input/pathmerge/part-1
new file mode 100755
index 0000000..95701ce
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-2 b/genomix/genomix-pregelix/data/input/pathmerge/part-2
new file mode 100755
index 0000000..6bb6633
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/pathmerge/part-3 b/genomix/genomix-pregelix/data/input/pathmerge/part-3
new file mode 100755
index 0000000..caa63de
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/pathmerge/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-0 b/genomix/genomix-pregelix/data/input/read2/part-0
new file mode 100755
index 0000000..1620187
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-1 b/genomix/genomix-pregelix/data/input/read2/part-1
new file mode 100755
index 0000000..d2e2476
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-2 b/genomix/genomix-pregelix/data/input/read2/part-2
new file mode 100755
index 0000000..7f3575e
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read2/part-3 b/genomix/genomix-pregelix/data/input/read2/part-3
new file mode 100755
index 0000000..03e23e1
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/read2/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index c4365ff..9e2e5f0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -8,6 +8,7 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -35,7 +36,8 @@
@Override
public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ if(vertex.getVertexValue().getState() != MessageFlag.IS_OLDHEAD)
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 9238f9d..c8d1852 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -29,6 +29,8 @@
protected Iterator<PositionWritable> posIterator;
byte headFlag;
protected byte outFlag;
+ protected byte inFlag;
+ protected byte selfFlag;
/**
* initiate kmerSize, maxIteration
@@ -53,27 +55,32 @@
* get destination vertex
*/
public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+ if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
outFlag |= MessageFlag.DIR_FF;
- }
- else{ // #FRList() > 0
+ return posIterator.next();
+ } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
posIterator = value.getFRList().iterator();
outFlag |= MessageFlag.DIR_FR;
+ return posIterator.next();
+ } else {
+ return null;
}
- return posIterator.next();
+
}
public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+ if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
outFlag |= MessageFlag.DIR_RF;
- }
- else{ // #RRList() > 0
+ return posIterator.next();
+ } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
posIterator = value.getRRList().iterator();
outFlag |= MessageFlag.DIR_RR;
+ return posIterator.next();
+ } else {
+ return null;
}
- return posIterator.next();
}
/**
@@ -202,8 +209,10 @@
public void setSuccessorAdjMsg(){
if(getVertexValue().getFFList().getLength() > 0)
outFlag |= MessageFlag.DIR_FF;
- else
+ else if(getVertexValue().getFRList().getLength() > 0)
outFlag |= MessageFlag.DIR_FR;
+ else
+ outFlag |= MessageFlag.DIR_NO;
}
/**
@@ -212,8 +221,10 @@
public void setPredecessorAdjMsg(){
if(getVertexValue().getRFList().getLength() > 0)
outFlag |= MessageFlag.DIR_RF;
- else
+ else if(getVertexValue().getRRList().getLength() > 0)
outFlag |= MessageFlag.DIR_RR;
+ else
+ outFlag |= MessageFlag.DIR_NO;
}
/**
@@ -223,7 +234,7 @@
public void broadcastUpdateMsg(){
if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
outFlag |= MessageFlag.IS_HEAD;
- switch(getVertexValue().getState() & 0b0001){
+ switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK){
case MessageFlag.SHOULD_MERGEWITHPREV:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
@@ -231,7 +242,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ if(getNextDestVertexId(getVertexValue()) != null)
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
break;
case MessageFlag.SHOULD_MERGEWITHNEXT:
setPredecessorAdjMsg();
@@ -240,7 +252,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ if(getPreDestVertexId(getVertexValue()) != null)
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
break;
}
}
@@ -250,14 +263,14 @@
* @throws IOException
*/
public void sendMergeMsg(){
- if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0){
+ if(selfFlag == MessageFlag.IS_HEAD){
byte newState = getVertexValue().getState();
newState &= ~MessageFlag.IS_HEAD;
newState |= MessageFlag.IS_OLDHEAD;
getVertexValue().setState(newState);
+ resetSelfFlag();
outFlag |= MessageFlag.IS_HEAD;
- voteToHalt();
- } else if((getVertexValue().getState() & MessageFlag.IS_OLDHEAD) > 0){
+ } else if(selfFlag == MessageFlag.IS_OLDHEAD){
outFlag |= MessageFlag.IS_OLDHEAD;
voteToHalt();
}
@@ -294,7 +307,7 @@
* @throws IOException
*/
public void broadcastMergeMsg(){
- if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ if(headFlag > 0)
outFlag |= MessageFlag.IS_HEAD;
switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK) {
case MessageFlag.SHOULD_MERGEWITHNEXT:
@@ -306,6 +319,7 @@
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setChainVertexId(getVertexValue().getKmer());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ deleteVertex(getVertexId());
break;
case MessageFlag.SHOULD_MERGEWITHPREV:
setPredecessorAdjMsg();
@@ -316,6 +330,7 @@
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setChainVertexId(getVertexValue().getKmer());
sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ deleteVertex(getVertexId());
break;
}
}
@@ -324,7 +339,7 @@
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
- public void sendUpMsgToPredecessor(){
+ public void sendUpdateMsgToPredecessor(){
byte state = getVertexValue().getState();
state |= MessageFlag.SHOULD_MERGEWITHNEXT;
getVertexValue().setState(state);
@@ -339,7 +354,7 @@
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
- public void sendUpMsgToSuccessor(){
+ public void sendUpdateMsgToSuccessor(){
byte state = getVertexValue().getState();
state |= MessageFlag.SHOULD_MERGEWITHPREV;
getVertexValue().setState(state);
@@ -411,11 +426,18 @@
* merge and updateAdjList merge with one neighbor
*/
public void processMerge(){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ inFlag = incomingMsg.getFlag();
+ byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ if((inFlag & MessageFlag.IS_HEAD) > 0){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.IS_HEAD;
+ getVertexValue().setState(state);
+ }
+
boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
+ if((inFlag & MessageFlag.FLIP) > 0)
flip = true;
else
flip = false;
@@ -445,6 +467,50 @@
kmerSize, msg.getKmer());
}
+ /**
+ * set head state
+ */
+ public void setHeadState(){
+ byte state = getVertexValue().getState();
+ state &= MessageFlag.VERTEX_CLEAR;
+ state |= MessageFlag.IS_HEAD;
+ getVertexValue().setState(state);
+ }
+
+ /**
+ * set final state
+ */
+ public void setFinalState(){
+ byte state = getVertexValue().getState();
+ state &= MessageFlag.VERTEX_CLEAR;
+ state |= MessageFlag.IS_FINAL;
+ getVertexValue().setState(state);
+ }
+
+ /**
+ * set final state
+ */
+ public void setStopFlag(){
+ byte state = incomingMsg.getFlag();
+ state &= MessageFlag.VERTEX_CLEAR;
+ state |= MessageFlag.IS_STOP;
+ getVertexValue().setState(state);
+ }
+
+ /**
+ * get Vertex state
+ */
+ public byte getMsgFlag(){
+ return (byte)(incomingMsg.getFlag() & MessageFlag.VERTEX_MASK);
+ }
+
+ /**
+ * reset selfFlag
+ */
+ public void resetSelfFlag(){
+ selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 429282b..500a903 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -44,10 +44,8 @@
BasicPathMergeVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
- protected MessageWritable outgoingMsg2 = new MessageWritable();
- protected PositionWritable destVertexId2 = new PositionWritable();
-
- byte finalFlag;
+ PositionWritable tempPostition = new PositionWritable();
+
/**
* initiate kmerSize, maxIteration
*/
@@ -57,7 +55,9 @@
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
+ selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
outgoingMsg.reset();
+ receivedMsgList.clear();
}
/**
@@ -65,16 +65,22 @@
*/
public void sendOutMsg() {
//send wantToMerge to next
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, outgoingMsg);
+ tempPostition = getNextDestVertexId(getVertexValue());
+ if(tempPostition != null){
+ destVertexId.set(tempPostition);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, outgoingMsg);
+ }
////send wantToMerge to prev
- destVertexId2.set(getPreDestVertexId(getVertexValue()));
- outgoingMsg2.setFlag(outFlag);
- outgoingMsg2.setSourceVertexId(getVertexId());
- sendMsg(destVertexId2, outgoingMsg2);
+ tempPostition = getPreDestVertexId(getVertexValue());
+ if(tempPostition != null){
+ destVertexId.set(tempPostition);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, outgoingMsg);
+ }
}
/**
@@ -84,88 +90,96 @@
int countHead = 0;
int countOldHead = 0;
for(int i = 0; i < receivedMsgList.size(); i++){
- if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_HEAD) > 0)
- countHead++;
- if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_OLDHEAD) > 0)
- countOldHead++;
+ inFlag = receivedMsgList.get(i).getFlag();
+ switch(inFlag & MessageFlag.VERTEX_MASK){
+ case MessageFlag.IS_HEAD:
+ countHead++;
+ break;
+ case MessageFlag.IS_OLDHEAD:
+ countOldHead++;
+ break;
+ }
}
- if(countHead == 0 && countOldHead == 0)
- return MessageFromHead.BothMsgsFromNonHead;
- else if(countHead == 2)
+ if(countHead == 2)
return MessageFromHead.BothMsgsFromHead;
- else if(countOldHead == 2)
- return MessageFromHead.BothMsgsFromOldHead;
- else if(countHead == 1 && headFlag == 0)
- return MessageFromHead.OneMsgFromHeadToNonHead;
- else if(countHead == 1 && headFlag > 0)
- return MessageFromHead.OneMsgFromHeadToHead;
- else if(countOldHead == 1)
- return MessageFromHead.OneMsgFromNonHead;
-
- return MessageFromHead.NO_INFO;
+ else if(countHead == 1 && countOldHead == 1)
+ return MessageFromHead.OneMsgFromOldHeadAndOneFromHead;
+ else if(countHead == 1 && countOldHead == 0)
+ return MessageFromHead.OneMsgFromHeadAndOneFromNonHead;
+ else if(countHead == 0 && countOldHead == 0)
+ return MessageFromHead.BothMsgsFromNonHead;
+ else
+ return MessageFromHead.NO_MSG;
}
/**
* head send message to path
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
- //process merge when receiving msg
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- receivedMsgList.add(incomingMsg);
- }
- if(receivedMsgList.size() != 0){
- byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
- switch(numOfMsgsFromHead){
- case MessageFromHead.BothMsgsFromNonHead:
- for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- break;
- case MessageFromHead.BothMsgsFromHead:
- case MessageFromHead.BothMsgsFromOldHead:
- for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_FINAL);
- break;
- case MessageFromHead.OneMsgFromHeadToNonHead:
- for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_HEAD);
- break;
- case MessageFromHead.OneMsgFromHeadToHead: //stop condition
- for(int i = 0; i < 2; i++){
- if(headFlag > 0){
- processMerge(receivedMsgList.get(i));
- break;
- }
- }
- getVertexValue().setState(MessageFlag.IS_FINAL);
- //voteToHalt();
- break;
- case MessageFromHead.OneMsgFromNonHead:
- //halt
- //voteToHalt();
- break;
- }
- }
//send out wantToMerge msg
- resetHeadFlag();
- finalFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_FINAL);
- outFlag = (byte)(headFlag | finalFlag);
- if(outFlag == 0)
- sendOutMsg();
+ if(selfFlag != MessageFlag.IS_HEAD){
+ sendOutMsg();
+ }
}
/**
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
+ if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
+ getVertexValue().setState(MessageFlag.IS_STOP);
+ sendOutMsg();
+ }
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- sendMergeMsg();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ processMerge(incomingMsg);
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ }else
+ sendMergeMsg();
}
}
+ /**
+ * head vertex process merge
+ */
+ public void processMergeInHeadVertex(Iterator<MessageWritable> msgIterator){
+ //process merge when receiving msg
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ setStopFlag();
+ sendMergeMsg();
+ break;
+ }
+ receivedMsgList.add(incomingMsg);
+ }
+ if(receivedMsgList.size() != 0){
+ byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
+ switch(numOfMsgsFromHead){
+ case MessageFromHead.BothMsgsFromHead:
+ case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ voteToHalt();
+ break;
+ case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_HEAD);
+ break;
+ case MessageFromHead.BothMsgsFromNonHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ break;
+ case MessageFromHead.NO_MSG:
+ //halt
+ deleteVertex(getVertexId());
+ break;
+ }
+ }
+ }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -173,13 +187,17 @@
startSendMsg();
else if (getSuperstep() == 2)
initState(msgIterator);
- else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
+ else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
sendMsgToPathVertex(msgIterator);
- if(headFlag == 0)
+ if(selfFlag != MessageFlag.IS_HEAD)
voteToHalt();
- } else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
+ } else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- } else
+ if(selfFlag != MessageFlag.IS_HEAD)
+ voteToHalt();
+ } else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
+ processMergeInHeadVertex(msgIterator);
+ }else
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index f250afc..f70a76e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,14 +1,10 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
-import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
-import org.apache.hadoop.io.NullWritable;
-
import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
@@ -16,8 +12,6 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
/*
* vertexId: BytesWritable
@@ -55,7 +49,7 @@
public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
- private static long randSeed = -1;
+ private static long randSeed = 1;
private float probBeingRandomHead = -1;
private Random randGenerator;
@@ -67,8 +61,6 @@
private boolean curHead;
private boolean nextHead;
private boolean prevHead;
- private byte headFlag;
- private byte tailFlag;
private byte selfFlag;
/**
@@ -79,8 +71,9 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- if (randSeed < 0)
- randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ //if (randSeed < 0)
+ // randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
@@ -90,12 +83,17 @@
nextHead = false;
prevHead = false;
outFlag = (byte)0;
+ inFlag = (byte)0;
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
outgoingMsg.reset();
}
protected boolean isNodeRandomHead(PositionWritable nodeID) {
// "deterministically random", based on node id
- randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ //randGenerator.setSeed(randSeed);
+ //randSeed = randGenerator.nextInt();
+ randGenerator.setSeed((randSeed ^ nodeID.hashCode()) * 100000);//randSeed + nodeID.hashCode()
return randGenerator.nextFloat() < probBeingRandomHead;
}
@@ -141,48 +139,50 @@
else if (getSuperstep() == 2)
initState(msgIterator);
else if (getSuperstep() % 4 == 3){
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
- tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
- outFlag = (byte) (headFlag | tailFlag);
+
+ //tailFlag = (byte) (MessageFlag.IS_TAIL & getVertexValue().getState());
+ //outFlag = (byte) (headFlag | tailFlag);
+ outFlag |= headFlag;
+ outFlag |= MessageFlag.NO_MERGE;
// only PATH vertices are present. Find the ID's for my neighbors
curID.set(getVertexId());
curHead = isNodeRandomHead(curID);
+
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
// We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
- hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+ hasNext = setNextInfo(getVertexValue());//&& headFlag == 0;
+ hasPrev = setPrevInfo(getVertexValue());//&& headFlag == 0;
if (hasNext || hasPrev) {
if (curHead) {
- if (hasNext && !nextHead) {
+ if (hasNext && !nextHead && (getNextDestVertexId(getVertexValue()) != null)) {
// compress this head to the forward tail
- sendUpMsgToPredecessor(); //TODO up -> update From -> to
- } else if (hasPrev && !prevHead) {
+ sendUpdateMsgToPredecessor(); //TODO up -> update From -> to
+ } else if (hasPrev && !prevHead && (getPreDestVertexId(getVertexValue()) != null)) {
// compress this head to the reverse tail
- sendUpMsgToSuccessor();
+ sendUpdateMsgToSuccessor();
}
} else {
// I'm a tail
if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- sendUpMsgToPredecessor();
+ sendUpdateMsgToPredecessor();
}
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
- sendUpMsgToPredecessor();
+ sendUpdateMsgToPredecessor();
}
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
- sendUpMsgToSuccessor();
+ sendUpdateMsgToSuccessor();
}
}
}
@@ -197,17 +197,16 @@
} else if (getSuperstep() % 4 == 1){
//send message to the merge object and kill self
broadcastMergeMsg();
- deleteVertex(getVertexId());
} else if (getSuperstep() % 4 == 2){
//merge kmer
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
+ selfFlag = (byte) (MessageFlag.VERTEX_MASK & getVertexValue().getState());
processMerge();
//head meets head, stop
- headFlag = (byte) (MessageFlag.IS_HEAD & incomingMsg.getFlag());
- selfFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
- if((headFlag & selfFlag) > 0)
+ headFlag = (byte) (MessageFlag.VERTEX_MASK & incomingMsg.getFlag());
+ if(headFlag == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)
voteToHalt();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 27eb40a..d68c98b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -243,7 +243,7 @@
if (incomingMsg.getFlag() == Message.START) {
getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
} else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(MessageFlag.IS_TAIL);
+ getVertexValue().setState(MessageFlag.IS_HEAD);
getVertexValue().setKmer(getVertexValue().getKmer());
//voteToHalt();
} //else
@@ -445,7 +445,7 @@
// We prevent merging towards non-path nodes
hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
getVertexValue().setState(outFlag);
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index c130ec6..855deaa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -2,21 +2,14 @@
import edu.uci.ics.genomix.type.NodeWritable.MergeDirFlag;
-public class MessageFlag extends MergeDirFlag{
+public class MessageFlag extends MergeDirFlag {
- public static final byte FLIP = 1 << 3;
- public static final byte IS_HEAD = 1 << 4;
- public static final byte IS_TAIL = 1 << 5;
- public static final byte IS_OLDHEAD = 1 << 6;
- public static final byte IS_FINAL = 0b000011;
+ public static final byte FLIP = 1 << 6;
+
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
switch (code) {
- case IS_HEAD:
- return "IS_HEAD";
- case IS_OLDHEAD:
- return "IS_OLDHEAD";
case FLIP:
return "FLIP";
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
index f343c2e..05a2f95 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
@@ -1,13 +1,17 @@
package edu.uci.ics.genomix.pregelix.type;
public class MessageFromHead {
- public static final byte BothMsgsFromHead = 1 << 0;
- public static final byte BothMsgsFromNonHead = 1 << 1;
- public static final byte BothMsgsFromOldHead = 1 << 2;
- public static final byte OneMsgFromHead = 1 << 3;
- public static final byte OneMsgFromNonHead = 1 << 4;
- public static final byte OneMsgFromHeadToNonHead = 1 << 5;
- public static final byte OneMsgFromHeadToHead = 1 << 6;
+ public static final byte BothMsgsFromHead = 0b0000 << 1;
+ public static final byte BothMsgsFromNonHead = 0b0001 << 1;
+ public static final byte BothMsgsFromOldHead = 0b0010 << 1;
+ public static final byte OneMsgFromHead = 0b0011 << 1;
+ public static final byte OneMsgFromNonHead = 0b0100 << 1;
+ public static final byte OneMsgFromHeadAndOneFromNonHead = 0b0101 << 1;
+ public static final byte OneMsgFromHeadToHead = 0b0110 << 1;
+ public static final byte OneMsgFromOldHeadToNonHead = 0b0111 << 1;
+ public static final byte OneMsgFromOldHeadToHead = 0b1000 << 1;
+ public static final byte OneMsgFromOldHeadAndOneFromHead = 0b1001 << 1;
+ public static final byte NO_MSG = 0b1010 << 1;
public static final byte NO_INFO = 0 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index a63da20..0a366db 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -18,6 +18,7 @@
import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -79,6 +80,23 @@
+ "P3ForMergeGraph.xml");
}
+ private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(P4ForPathMergeVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genP4ForMergeGraph() throws IOException {
+ generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+ + "P4ForMergeGraph.xml");
+ }
+
private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(TipAddVertex.class);
@@ -191,6 +209,7 @@
//genBridgeRemoveGraph();
//genBubbleAddGraph();
//genBubbleMergeGraph();
+ //genP4ForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index 355bc7a..7bf5aa3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 3;
- private static final int ReadLength = 7;
+ private static final int ReadLength = 8;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
+ private static final String DATA_INPUT_PATH = "data/graphbuild.test/read2.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";