fix RemoveBridgeVertex and update for merge
diff --git a/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000 b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
new file mode 100755
index 0000000..bc255a4
--- /dev/null
+++ b/genomix/genomix-pregelix/data/AddBridge/SimpleTest/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index c6ff206..5847aae 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -11,18 +11,17 @@
public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
- public static class State extends VertexStateFlag{
- public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b101 << 0;
- public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b111 << 0;
-
+ public static class State extends VertexStateFlag{
public static final byte NO_MERGE = 0b00 << 3;
public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
- public static final byte KILL = 0b11 << 3;
- public static final byte KILL_MASK = 0b11 << 3;
+ public static final byte KILL = 0b1 << 3;
+ public static final byte KILL_MASK = 0b1 << 3;
+
+ public static final byte DIR_FROM_DEADVERTEX = 0b10 << 3;
}
public static class VertexStateFlag extends FakeFlag {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 7f85fc7..caf7a36 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -11,8 +11,8 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
@@ -59,76 +59,17 @@
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if(length == -1)
- length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
- outgoingMsg.reset();
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new KmerBytesWritable(kmerSize);
receivedMsgList.clear();
}
-
- /**
- * broadcast kill self to all neighbers
- */
- public void broadcaseKillself(){
- outgoingMsg.setSourceVertexId(getVertexId());
- if(receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } // RR
- else if(receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else
- voteToHalt();
- }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
@@ -150,13 +91,17 @@
i++;
}
if(receivedMsgList.size() == 2){
- if(getVertexValue().getLengthOfKmer() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length
+ && getVertexValue().getDegree() == 2){
broadcaseKillself();
}
}
}
else if(getSuperstep() == 3){
- responseToDeadVertex(msgIterator);
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index 835b42d..6e3517b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -266,7 +266,7 @@
mapKeyByActualKmer(msgIterator);
/** Reducer **/
reduceKeyByActualKmer();
- voteToHalt();
+ voteToHalt();
}
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
if(!isFakeVertex){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 2ed36e9..51a73f1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -11,5 +11,5 @@
public static final byte DIR_MASK = 0b111 << 0;
public static final byte DIR_CLEAR = 0b1111000 << 0;
- public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
+// public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 8c39af3..fbe47ce 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -178,23 +178,23 @@
generateBridgeAddGraphJob("BridgeAddGraph", outputBase
+ "BridgeAddGraph.xml");
}
-//
-// private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BridgeRemoveVertex.class);
-// job.setVertexInputFormatClass(GraphCleanInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBridgeRemoveGraph() throws IOException {
-// generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
-// + "BridgeRemoveGraph.xml");
-// }
+
+ private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeRemoveVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeRemoveGraph() throws IOException {
+ generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
+ + "BridgeRemoveGraph.xml");
+ }
//
// private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -246,6 +246,7 @@
genTipAddGraph();
genBridgeAddGraph();
genTipRemoveGraph();
+ genBridgeRemoveGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
index 397c514..171a6ca 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BridgeRemoveSmallTestSuite.java
@@ -43,9 +43,9 @@
public class BridgeRemoveSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BridgeRemoveSmallTestSuite.class.getName());
- public static final String PreFix = "data/actual"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/bridgeadd/BridgeAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "pathmerge/P4ForMergeGraph/bin/tworeads"};
+ + "SimpleTest"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bridgeremove";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";