add filter path class
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index 1812b57..7ef45de 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -8,6 +8,7 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -36,8 +37,8 @@
@Override
public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
- //if(vertex.getVertexValue().isOp() == true)
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ if(vertex.getVertexValue().getState() == State.FILTER)
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java
index 2dbc180..62de480 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java
@@ -164,7 +164,7 @@
if(getVertexValue().getState() == State.START_VERTEX){
sendStartMsgToNextNode();
}
- else if(getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ else if(getVertexValue().getState() != State.END_VERTEX){
sendEndMsgToNextNode();
}
}
@@ -187,9 +187,9 @@
*/
public void setMessageType(int message){
//kill Message because it has been merged by the head
- if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
+ if(getVertexValue().getState() == State.END_VERTEX){
msg.setMessage(Message.END);
- getVertexValue().setState(State.FINAL_DELETE);
+ getVertexValue().setState(State.END_VERTEX);
setVertexValue(getVertexValue());
}
else
@@ -290,7 +290,7 @@
else
voteToHalt();
}
- if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
+ if(getVertexValue().getState() == State.END_VERTEX){
voteToHalt();
}
if(getVertexValue().getState() == State.FINAL_VERTEX){
@@ -309,7 +309,7 @@
}
else{
if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ && getVertexValue().getState() != State.END_VERTEX){
deleteVertex(getVertexId());//killSelf because it doesn't receive any message
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
index 6efd961..a02f3d5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
@@ -10,6 +10,10 @@
import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
@@ -154,9 +158,9 @@
initVertex();
if (getSuperstep() == 1) {
if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- if(getVertexId().toString().equals("AAGAC")){
- //getVertexValue().setOp(true);
- //setVertexValue(getVertexValue());
+ if(getVertexId().toString().equals("ACGTTATATGGGACGACGTTATAACGTATTACGTTATATGGGATGTCGTTATAAC")){
+ getVertexValue().setState(State.FILTER);
+ setVertexValue(getVertexValue());
msg.set(getVertexId(), chainVertexId, getVertexId(), (byte)0, false);
sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
}
@@ -166,8 +170,8 @@
}
else if(getSuperstep() == 2){
if(msgIterator.hasNext()){
- //getVertexValue().setOp(true);
- //setVertexValue(getVertexValue());
+ getVertexValue().setState(State.FILTER);
+ setVertexValue(getVertexValue());
msg = msgIterator.next();
initChainVertex();
@@ -176,8 +180,8 @@
//head node sends message to path node
else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
while (msgIterator.hasNext()){
- //getVertexValue().setOp(true);
- //setVertexValue(getVertexValue());
+ getVertexValue().setState(State.FILTER);
+ setVertexValue(getVertexValue());
msg = msgIterator.next();
sendMsgToPathVertex();
}
@@ -185,12 +189,29 @@
//path node sends message back to head node
else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
while(msgIterator.hasNext()){
- //getVertexValue().setOp(true);
- //setVertexValue(getVertexValue());
+ getVertexValue().setState(State.FILTER);
+ setVertexValue(getVertexValue());
msg = msgIterator.next();
responseMsgToHeadVertex();
}
}
voteToHalt();
}
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(NaiveFilterVertex.class.getSimpleName());
+ job.setVertexClass(NaiveFilterVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
index 70ba487..7946460 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
@@ -165,7 +165,7 @@
if(getVertexValue().getState() == State.START_VERTEX){
sendStartMsgToNextNode();
}
- else if(getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ else if(getVertexValue().getState() != State.END_VERTEX){
sendEndMsgToNextNode();
}
}
@@ -189,9 +189,9 @@
*/
public void setMessageType(int message){
//kill Message because it has been merged by the head
- if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
+ if(getVertexValue().getState() == State.END_VERTEX){
msg.setMessage(Message.END);
- getVertexValue().setState(State.FINAL_DELETE);
+ getVertexValue().setState(State.END_VERTEX);
setVertexValue(getVertexValue());
//deleteVertex(getVertexId());
}
@@ -297,7 +297,7 @@
}
else{
if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ && getVertexValue().getState() != State.END_VERTEX){
deleteVertex(getVertexId());//killSelf because it doesn't receive any message
}
}
@@ -311,7 +311,7 @@
setVertexValueAttributes();
sendMsgToSelf();
}
- if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
+ if(getVertexValue().getState() == State.END_VERTEX){
voteToHalt();
}
if(getVertexValue().getState() == State.FINAL_VERTEX){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index 4257f89..4a2af9e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -8,7 +8,7 @@
public static final byte MID_VERTEX = 3;
public static final byte TODELETE = 4;
public static final byte FINAL_VERTEX = 5;
- public static final byte FINAL_DELETE = 6;
+ public static final byte FILTER = 6;
public static final byte CYCLE = 7;
public final static class STATE_CONTENT{
@@ -34,7 +34,7 @@
case FINAL_VERTEX:
r = "FINAL_VERTEX";
break;
- case FINAL_DELETE:
+ case FILTER:
r = "FINAL_DELETE";
break;
case CYCLE:
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java
index a4c17b2..bead712 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java
@@ -29,7 +29,7 @@
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 55);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -59,7 +59,7 @@
*/
public static void main(String[] args) throws IOException {
genNaiveFilter();
- genLogFilter();
+ //genLogFilter();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 68e29ee..3e494ba 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -27,7 +27,7 @@
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 55);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
index af181d9..a381c3c 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
@@ -34,7 +34,7 @@
private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_PATH = "graph/7/TreePath";
+ private static final String DATA_PATH = "data/shortjump_1.head8M.fastq";
private static final String HDFS_INPUT_PATH = "/test";
private static final String HDFS_OUTPUT_PATH = "/result";
@@ -65,7 +65,7 @@
FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- conf.setInt(GenomixJob.KMER_LENGTH, 7);
+ conf.setInt(GenomixJob.KMER_LENGTH, 55);
driver = new Driver(
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java
index a625a79..ef89b9e 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java
@@ -60,7 +60,6 @@
public void TestPreClusterGroupby() throws Exception {
conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- //conf.set(GenomixJob.OUTPUT_FORMAT, "text");
System.err.println("Testing PreClusterGroupBy");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
index d813709..54cecbc 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
@@ -59,8 +59,8 @@
break;
}
if(value.getLengthOfMergeChain() != 0
- && value.getLengthOfMergeChain() != -1){
- //&& value.getState() == State.FINAL_VERTEX){
+ && value.getLengthOfMergeChain() != -1
+ && value.getState() == State.FINAL_VERTEX){
//bw.write(key.toString() + "\t" +
// value.toString());
bw.write(value.getLengthOfMergeChain() + "\t" +
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
index 67b6f21..4b6d367 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
@@ -77,7 +77,7 @@
private void compareResults() throws Exception {
dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(
resultFileDir));
- GenerateTextFile.generateFromPathmergeResult(5, resultFileDir, textFileDir);
+ GenerateTextFile.generateFromPathmergeResult(55, resultFileDir, textFileDir);
// TestUtils.compareWithResultDir(new File(expectedFileDir), new
// File(resultFileDir));
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
index c1ddfef..d05fc7a 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
@@ -44,14 +44,15 @@
private static final Logger LOGGER = Logger
.getLogger(PathMergeSmallTestSuite.class.getName());
- public static final String PreFix = "data/PathTestSet";
+ public static final String PreFix = "data";
public static final String[] TestDir = { PreFix + File.separator
+ + "test8m"};
/*+ "BridgePath", PreFix + File.separator
+ "CyclePath", PreFix + File.separator
+ "SimplePath", PreFix + File.separator
+ "SinglePath", PreFix + File.separator
+ "TreePath"};*/
- + "2", PreFix + File.separator
+ /*+ "2", PreFix + File.separator
+ "3", PreFix + File.separator
+ "4", PreFix + File.separator
+ "5", PreFix + File.separator
@@ -69,7 +70,7 @@
+ "CyclePath", PreFix + File.separator
+ "RingPath", PreFix + File.separator
+ "LongPath", PreFix + File.separator
- + "TreePath"};
+ + "TreePath"};*/
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";