p3 path merge pass test
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index 2f2bd5c..b06dbbe 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -8,6 +8,7 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -35,7 +36,8 @@
@Override
public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ if(vertex.getVertexValue().getState() != State.END_VERTEX)
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index a4730c0..bf83333 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -169,7 +169,7 @@
public void setState() {
if (incomingMsg.getMessage() == Message.START) {
getVertexValue().setState(State.START_VERTEX);
- getVertexValue().setMergeChain(null);
+ //getVertexValue().setMergeChain(null);
} else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
getVertexValue().setMergeChain(getVertexValue().getMergeChain());
@@ -198,7 +198,6 @@
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3) {
- getVertexValue().setMergeChain(getVertexValue().getMergeChain());
sendOutMsg();
} else {
if (msgIterator.hasNext()) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index 5ba5f31..35e679a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -146,6 +146,14 @@
outgoingMsg.setMessage(Message.END);
sendMsgToAllPreviousNodes(getVertexValue());
}
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setMessage(Message.START);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ }
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+ outgoingMsg.setMessage(Message.END);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ }
}
/**
@@ -154,7 +162,9 @@
public void initState(Iterator<MessageWritable> msgIterator) {
if (msgIterator.hasNext()) {
do {
- if (!VertexUtil.isPathVertex(getVertexValue())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
@@ -314,7 +324,7 @@
outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
else {
deleteVertex(getVertexId());
- outgoingMsg.setNeighberNode(incomingMsg.getNeighberNode());
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.PSEUDOREAR)
outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 785c7f5..33955c9 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -30,7 +30,7 @@
private static void generateNaiveAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
- job.setVertexInputFormatClass(DataCleanInputFormat.class); //NaiveAlgorithmForPathMergeInputFormat
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); //DataCleanInputFormat
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
@@ -48,7 +48,7 @@
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
@@ -64,13 +64,13 @@
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(P3ForPathMergeVertex.class);
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 5);
- job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.4f);
- job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, 1);
+ job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.3f);
+ job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, 2);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -183,8 +183,8 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- genLogAlgorithmForMergeGraph();
- //genP3ForMergeGraph();
+ //genLogAlgorithmForMergeGraph();
+ genP3ForMergeGraph();
//genTipAddGraph();
//genTipRemoveGraph();
//genBridgeAddGraph();
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index 1984a2e..c5aa0eb 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -50,7 +50,7 @@
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
+ private static final String DATA_INPUT_PATH = "data/graphbuild.test/tworeads.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -71,8 +71,8 @@
@Test
public void TestAll() throws Exception {
- //TestEndToEnd();
- TestUnMergedNode();
+ TestEndToEnd();
+ //TestUnMergedNode();
}
public void TestEndToEnd() throws Exception {
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/LogAlgorithmForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml
similarity index 95%
rename from genomix/genomix-pregelix/src/test/resources/jobs/LogAlgorithmForMergeGraph.xml
rename to genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml
index 2d6b9c4..6cfeda1 100644
--- a/genomix/genomix-pregelix/src/test/resources/jobs/LogAlgorithmForMergeGraph.xml
+++ b/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml
@@ -3,11 +3,13 @@
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>P3ForPathMergeVertex.kmerSize</name><value>3</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
<property><name>mapred.submit.replication</name><value>10</value></property>
<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>P3ForPathMergeVertex.pseudoRate</name><value>0.3</value></property>
<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
@@ -35,7 +37,7 @@
<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>LogAlgorithmForMergeGraph</value></property>
+<property><name>mapred.job.name</name><value>P3ForMergeGraph</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
@@ -59,12 +61,13 @@
<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
<property><name>fs.default.name</name><value>file:///</value></property>
<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
+<property><name>P3ForPathMergeVertex.maxRound</name><value>2</value></property>
<property><name>tasktracker.http.threads</name><value>40</value></property>
<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -103,7 +106,6 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>LogAlgorithmForPathMergeVertex.kmerSize</name><value>3</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -117,12 +119,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>