clean pregel package for build
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 0da4a77..73a8648 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -9,9 +9,7 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -43,11 +41,11 @@
@Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
public String profiling = "false";
- @Option(name = "-pseudo-rate", usage = "the rate of pseduHead", required = false)
- public float pseudoRate = -1;
-
- @Option(name = "-max-patitionround", usage = "max rounds in partition phase", required = false)
- public int maxRound = -1;
+// @Option(name = "-pseudo-rate", usage = "the rate of pseduHead", required = false)
+// public float pseudoRate = -1;
+//
+// @Option(name = "-max-patitionround", usage = "max rounds in partition phase", required = false)
+// public int maxRound = -1;
}
public static void run(String[] args, PregelixJob job) throws Exception {
@@ -66,19 +64,15 @@
for (int i = 1; i < inputs.length; i++)
FileInputFormat.addInputPaths(job, inputs[0]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- job.getConfiguration().setInt(P1ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
- job.getConfiguration().setInt(P2ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
- job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(BasicGraphCleanVertex.KMER_SIZE, options.sizeKmer);
if (options.numIteration > 0) {
- job.getConfiguration().setInt(P1ForPathMergeVertex.ITERATIONS, options.numIteration);
- job.getConfiguration().setInt(P2ForPathMergeVertex.ITERATIONS, options.numIteration);
- job.getConfiguration().setInt(P3ForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setInt(BasicGraphCleanVertex.ITERATIONS, options.numIteration);
}
- if (options.pseudoRate > 0 && options.pseudoRate <= 1)
- job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
- if (options.maxRound > 0)
- job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, options.maxRound);
+// if (options.pseudoRate > 0 && options.pseudoRate <= 1)
+// job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
+// if (options.maxRound > 0)
+// job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, options.maxRound);
return options;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
deleted file mode 100644
index f5b0157..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ /dev/null
@@ -1,265 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator.pathmerge;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-/**
- * Naive Algorithm for path merge graph
- */
-public class P1ForPathMergeVertex extends
- Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "P1ForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "P1ForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
-
- private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable();
-
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
- private Iterator<VKmerBytesWritable> posIterator;
-
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex() {
- if (kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- outgoingMsg.reset();
- }
-
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- }
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getFlag() == Message.START) {
- getVertexValue().setState(State.IS_HEAD);
- } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.IS_HEAD) {
- getVertexValue().setState(State.IS_HEAD);//is tail
- voteToHalt();
- } else
- voteToHalt();
- }
-
- /**
- * merge chainVertex and store in vertexVal.chainVertexId
- */
- public void mergeChainVertex() {
- //merge chain
- lastKmer.setAsCopy(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getActualKmer()));
- getVertexValue().setActualKmer(kmerFactory.mergeTwoKmer(getVertexValue().getActualKmer(), lastKmer));
- getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- }
-
- /**
- * head node sends message to path node
- */
- public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
- if (getSuperstep() == 3) {
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- } else {
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if (incomingMsg.getFlag() != Message.STOP) {
- mergeChainVertex();
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- } else {
- mergeChainVertex();
- getVertexValue().setState(State.IS_FINAL);
- //String source = getVertexValue().getKmer().toString();
- //System.out.println();
- }
- }
- }
- }
-
- /**
- * path node sends message back to head node
- */
- public void responseMsgToHeadVertex() {
- deleteVertex(getVertexId());
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
- if (getVertexValue().getState() == State.IS_HEAD)//is_tail
- outgoingMsg.setFlag(Message.STOP);
- destVertexId.setAsCopy(incomingMsg.getSourceVertexId());
- sendMsg(destVertexId, outgoingMsg);
- }
-
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1) {
- startSendMsg();
- voteToHalt();
- } else if (getSuperstep() == 2)
- initState(msgIterator);
- else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
- sendMsgToPathVertex(msgIterator);
- voteToHalt();
- } else if (getSuperstep() % 2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration) {
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- responseMsgToHeadVertex();
- }
- voteToHalt();
- } else
- voteToHalt();
- }
-
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(P1ForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(P1ForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(VKmerBytesWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
deleted file mode 100644
index 9a3aed5..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ /dev/null
@@ -1,414 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator.pathmerge;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State2;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-/**
- * Naive Algorithm for path merge graph
- */
-public class P3ForPathMergeVertex extends
- Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
- public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
- public static final String MAXROUND = "P3ForPathMergeVertex.maxRound";
- public static int kmerSize = -1;
- private int maxIteration = -1;
- public static float pseudoRate = -1;
- public static int maxRound = -1;
-
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
-
- private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
-
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
- private Iterator<VKmerBytesWritable> posIterator;
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex() {
- if (kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- if(pseudoRate < 0)
- pseudoRate = getContext().getConfiguration().getFloat(PSEUDORATE, 0.2f);
- if (maxRound < 0)
- maxRound = getContext().getConfiguration().getInt(MAXROUND, 2);
- outgoingMsg.reset();
- }
-
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- }
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- if (msgIterator.hasNext()) {
- do {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- } while (msgIterator.hasNext());
- } else {
- float random = (float) Math.random();
- if (random < pseudoRate)
- markPseudoHead();
- else{
- getVertexValue().setState(State2.NON_VERTEX);
- voteToHalt();
- }
- /*if (getVertexId().toString().equals("CCTCA") || getVertexId().toString().equals("CTCAG")) //AGTAC CCTCA CTCAG CGCCC ACGCC
- markPseudoHead();
- else
- voteToHalt();*/
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getFlag() == Message.START) {
- getVertexValue().setState(State2.START_VERTEX);
- } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State2.START_VERTEX) {
- getVertexValue().setState(State2.END_VERTEX);
- voteToHalt();
- } else
- voteToHalt();
- }
-
- /**
- * mark the pseudoHead
- */
- public void markPseudoHead() {
- getVertexValue().setState(State2.PSEUDOHEAD);
- outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
- destVertexId.setAsCopy(getPreDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- }
-
- /**
- * mark the pseudoRear
- */
- public void markPseudoRear() {
- if (incomingMsg.getFlag() == Message.FROMPSEUDOHEAD
- && getVertexValue().getState() != State2.START_VERTEX) {
- getVertexValue().setState(State2.PSEUDOREAR);
- voteToHalt();
- }
- else if(incomingMsg.getFlag() == Message.FROMPSEUDOHEAD
- && getVertexValue().getState() == State2.START_VERTEX){
- getVertexValue().setState(State2.START_HALT);
- }
- }
-
- /**
- * merge chain vertex
- */
- public void mergeChainVertex(){
- lastKmer.setAsCopy(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getActualKmer()));
- getVertexValue().setActualKmer(
- kmerFactory.mergeTwoKmer(getVertexValue().getActualKmer(),
- lastKmer));
- getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- }
-
- /**
- * head node sends message to path node
- */
- public void sendMsgToPathVertexMergePhase(Iterator<MessageWritable> msgIterator) {
- if (getSuperstep() == 3 + 2 * maxRound + 2) {
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- } else {
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if (incomingMsg.getFlag() != Message.STOP) {
- mergeChainVertex();
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- } else {
- mergeChainVertex();
- getVertexValue().setState(State2.FINAL_VERTEX);
- //String source = getVertexValue().getKmer().toString();
- //System.out.println();
- }
- }
- }
- }
-
- /**
- * path node sends message back to head node
- */
- public void responseMsgToHeadVertexMergePhase() {
- deleteVertex(getVertexId());
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
- if (getVertexValue().getState() == State2.END_VERTEX)
- outgoingMsg.setFlag(Message.STOP);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
-
- /**
- * head node sends message to path node in partition phase
- */
- public void sendMsgToPathVertexPartitionPhase(Iterator<MessageWritable> msgIterator) {
- if (getSuperstep() == 4) {
- if(getVertexValue().getState() != State2.START_HALT){
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- voteToHalt();
- }
- } else {
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- //if from pseudoHead, voteToHalt(), otherwise ...
- if (incomingMsg.getFlag() != Message.FROMPSEUDOHEAD){
- mergeChainVertex();
- if (incomingMsg.getFlag() != Message.STOP
- && incomingMsg.getFlag() != Message.FROMPSEUDOREAR) {
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.setAsCopy(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
- voteToHalt();
- } else {
- //check head or pseudoHead
- if (getVertexValue().getState() == State2.START_VERTEX
- && incomingMsg.getFlag() == Message.STOP) {
- getVertexValue().setState(State2.FINAL_VERTEX);
- //String source = getVertexValue().getKmer().toString();
- //System.out.println();
- } else if(getVertexValue().getState() == State2.PSEUDOHEAD
- && incomingMsg.getFlag() == Message.STOP)
- getVertexValue().setState(State2.END_VERTEX);
- }
- }
- }
- }
- }
-
- /**
- * path node sends message back to head node in partition phase
- */
- public void responseMsgToHeadVertexPartitionPhase() {
- if (getVertexValue().getState() == State2.PSEUDOHEAD)
- outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
- else {
- deleteVertex(getVertexId());
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
- outgoingMsg.setActualKmer(getVertexValue().getActualKmer());
- if (getVertexValue().getState() == State2.PSEUDOREAR)
- outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
- else if (getVertexValue().getState() == State2.END_VERTEX)
- outgoingMsg.setFlag(Message.STOP);
- }
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- voteToHalt();
- }
-
- /**
- * final process the result of partition phase
- */
- public void finalProcessPartitionPhase(Iterator<MessageWritable> msgIterator){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- mergeChainVertex();
- getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- //check head or pseudoHead
- if (getVertexValue().getState() == State2.START_VERTEX
- && incomingMsg.getFlag() == Message.STOP) {
- getVertexValue().setState(State2.FINAL_VERTEX);
- //String source = getVertexValue().getKmer().toString();
- //System.out.println();
- } else if(getVertexValue().getState() == State2.PSEUDOHEAD
- && incomingMsg.getFlag() == Message.STOP)
- getVertexValue().setState(State2.END_VERTEX);
- }
- }
- /**
- * After partition phase, reset state: ex. psudoHead and psudoRear -> null
- */
- public void resetState() {
- if (getVertexValue().getState() == State2.PSEUDOHEAD || getVertexValue().getState() == State2.PSEUDOREAR) {
- getVertexValue().setState(State2.NON_VERTEX);
- }
- if(getVertexValue().getState() == State2.START_HALT)
- getVertexValue().setState(State2.START_VERTEX);
- }
-
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1)
- startSendMsg();
- else if (getSuperstep() == 2)
- initState(msgIterator);
- else if (getSuperstep() == 3) {
- if (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- markPseudoRear();
- }
- } else if (getSuperstep() % 2 == 0 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
- sendMsgToPathVertexPartitionPhase(msgIterator);
- } else if (getSuperstep() % 2 == 1 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- responseMsgToHeadVertexPartitionPhase();
- }
- } else if(getSuperstep() == 3 + 2 * maxRound + 1 && getSuperstep() <= maxIteration){
- finalProcessPartitionPhase(msgIterator);
- } else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
- resetState();
- if(getVertexValue().getState() == State2.START_VERTEX)
- sendMsgToPathVertexMergePhase(msgIterator);
- voteToHalt();
- } else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- responseMsgToHeadVertexMergePhase();
- }
- voteToHalt();
- } else
- voteToHalt();
- }
-
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(P3ForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(P3ForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(VKmerBytesWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
deleted file mode 100644
index 0a43a96..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ /dev/null
@@ -1,501 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator.pathmerge;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.hadoop.io.NullWritable;
-
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-/**
- * Naive Algorithm for path merge graph
- */
-public class P5ForPathMergeVertex extends
- Vertex<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "P5ForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "P5ForPathMergeVertex.iteration";
- public static final String RANDSEED = "P5ForPathMergeVertex.randSeed";
- public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private static long randSeed = -1;
- private float probBeingRandomHead = -1;
- private Random randGenerator;
-
- private VKmerBytesWritable curID = new VKmerBytesWritable();
- private VKmerBytesWritable nextID = new VKmerBytesWritable();
- private VKmerBytesWritable prevID = new VKmerBytesWritable();
- private boolean hasNext;
- private boolean hasPrev;
- private boolean curHead;
- private boolean nextHead;
- private boolean prevHead;
- private byte headFlag;
- private byte tailFlag;
- private byte outFlag;
-
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable();
- private Iterator<VKmerBytesWritable> posIterator;
-
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex() {
- if (kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- if (randSeed < 0)
- randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
- randGenerator = new Random(randSeed);
- if (probBeingRandomHead < 0)
- probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
- hasNext = false;
- hasPrev = false;
- curHead = false;
- nextHead = false;
- prevHead = false;
- outgoingMsg.reset();
- }
-
- protected boolean isNodeRandomHead(VKmerBytesWritable nodeID) {
- // "deterministically random", based on node id
- randGenerator.setSeed(randSeed ^ nodeID.hashCode());
- return randGenerator.nextFloat() < probBeingRandomHead;
- }
-
- /**
- * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
- */
- protected boolean setNextInfo(VertexValueWritable value) {
- if (value.getFFList().getCountOfPosition() > 0) {
- nextID.setAsCopy(value.getFFList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
- return true;
- }
- if (value.getFRList().getCountOfPosition() > 0) {
- nextID.setAsCopy(value.getFRList().getPosition(0));
- nextHead = isNodeRandomHead(nextID);
- return true;
- }
- return false;
- }
-
- /**
- * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
- */
- protected boolean setPrevInfo(VertexValueWritable value) {
- if (value.getRRList().getCountOfPosition() > 0) {
- prevID.setAsCopy(value.getRRList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
- return true;
- }
- if (value.getRFList().getCountOfPosition() > 0) {
- prevID.setAsCopy(value.getRFList().getPosition(0));
- prevHead = isNodeRandomHead(prevID);
- return true;
- }
- return false;
- }
-
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public VKmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.setAsCopy(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getFlag() == Message.START) {
- getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
- } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.IS_HEAD) {
- getVertexValue().setState(MessageFlag.IS_HEAD);
- getVertexValue().setActualKmer(getVertexValue().getActualKmer());
- //voteToHalt();
- } //else
- //voteToHalt();
- }
-
- /**
- * check if A need to be flipped with successor
- */
- public boolean ifFilpWithSuccessor(){
- if(getVertexValue().getFRList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * check if A need to be filpped with predecessor
- */
- public boolean ifFlipWithPredecessor(){
- if(getVertexValue().getRFList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * set adjMessage to successor(from predecessor)
- */
- public void setSuccessorAdjMsg(){
- if(getVertexValue().getFFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_FF;
- else
- outFlag |= MessageFlag.DIR_FR;
- }
-
- /**
- * set adjMessage to predecessor(from successor)
- */
- public void setPredecessorAdjMsg(){
- if(getVertexValue().getRFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_RF;
- else
- outFlag |= MessageFlag.DIR_RF;
- }
-
- /**
- * send update message to neighber
- * @throws IOException
- */
- public void broadcastUpdateMsg(){
- /* switch(getVertexValue().getState() & 0b0001){
- case MessageFlag.SHOULD_MERGEWITHPREV:
- setSuccessorAdjMsg();
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- break;
- case MessageFlag.SHOULD_MERGEWITHNEXT:
- setPredecessorAdjMsg();
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
- break;
- }*/
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromPredecessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHNEXT;
- getVertexValue().setState(state);
- if(getVertexValue().getFFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromSuccessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHPREV;
- getVertexValue().setState(state);
- if(getVertexValue().getRFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * Returns the edge dir for B->A when the A->B edge is type @dir
- */
- public byte mirrorDirection(byte dir) {
- switch (dir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RF;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_FF;
- default:
- throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
- }
- }
-
- /**
- * check if need filp
- */
- public byte flipDirection(byte neighborDir, boolean flip){
- if(flip){
- switch (neighborDir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FF;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_RF;
- default:
- throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
- }
- } else
- return neighborDir;
- }
-
- /**
- * updateAdjList
- */
- public void processUpdate(){
- /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));*/
- }
-
- /**
- * merge and updateAdjList
- */
- public void processMerge(){
- /*byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getKmer());*/
- }
-
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1)
- startSendMsg();
- else if (getSuperstep() == 2)
- initState(msgIterator);
- else if (getSuperstep() % 4 == 3){
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
- tailFlag = (byte) (State.IS_HEAD & getVertexValue().getState()); //is_tail
- outFlag = (byte) (headFlag | tailFlag);
-
- // only PATH vertices are present. Find the ID's for my neighbors
- curID.setAsCopy(getVertexId());
-
- curHead = isNodeRandomHead(curID);
-
- // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
- // We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
- hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_HEAD) > 0) {
- getVertexValue().setState(outFlag);
- voteToHalt();
- }
- if (hasNext || hasPrev) {
- if (curHead) {
- if (hasNext && !nextHead) {
- // compress this head to the forward tail
- sendUpMsgFromPredecessor();
- } else if (hasPrev && !prevHead) {
- // compress this head to the reverse tail
- sendUpMsgFromSuccessor();
- }
- } else {
- // I'm a tail
- if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
- // tails on both sides, and I'm the "local minimum"
- // compress me towards the tail in forward dir
- sendUpMsgFromPredecessor();
- }
- } else if (!hasPrev) {
- // no previous node
- if (!nextHead && curID.compareTo(nextID) < 0) {
- // merge towards tail in forward dir
- sendUpMsgFromPredecessor();
- }
- } else if (!hasNext) {
- // no next node
- if (!prevHead && curID.compareTo(prevID) < 0) {
- // merge towards tail in reverse dir
- sendUpMsgFromSuccessor();
- }
- }
- }
- }
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(P5ForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(P5ForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(VKmerBytesWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index fe82add..de2fcd3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -13,11 +13,8 @@
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.splitrepeat.SplitRepeatVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
deleted file mode 100644
index 0d609bf..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.pregelix.graphbuilding;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.hyracks.driver.Driver;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-@SuppressWarnings("deprecation")
-public class JobRunStepByStepTest {
- private static final int KmerSize = 3;
- private static final int ReadLength = 8;
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-
- private static final String DATA_INPUT_PATH = "data/graphbuild.test/tworead3.txt";
- private static final String HDFS_INPUT_PATH = "/webmap";
- private static final String HDFS_OUTPUT_PATH = "/webmap_result";
-
- private static final String EXPECTED_DIR = "src/test/resources/expected/";
- private static final String EXPECTED_OUPUT_NODE = EXPECTED_DIR + "result_after_generateNode";
- private static final String EXPECTED_UNMERGED = EXPECTED_DIR + "result_unmerged";
-
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
- private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 2;
-
- private Driver driver;
-
- @Test
- public void TestAll() throws Exception {
- //TestEndToEnd();
- TestUnMergedNode();
- }
-
- public void TestEndToEnd() throws Exception {
- //conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
- cleanUpReEntry();
- conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_NODE, new int[] { 1, 2, 3, 4 }));
- }
-
- public void TestUnMergedNode() throws Exception {
- conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
- cleanUpReEntry();
- conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_UNMERGED_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_UNMERGED, new int[] { 1, 2, 3, 4 }));
- }
-
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
-
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-
- conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
- conf.setInt(GenomixJobConf.READ_LENGTH, ReadLength);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_INPUT_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- // dfs.mkdirs(result);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
-
- private boolean checkResults(String expectedPath, int[] poslistField) throws Exception {
- File dumped = null;
- String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
- if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- + partname), FileSystem.getLocal(new Configuration()),
- new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
- false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-
- NodeWritable node = new NodeWritable();
- NullWritable value = NullWritable.get();
- while (reader.next(node, value)) {
- if (node == null) {
- break;
- }
- bw.write(node.toString());
- System.out.println(node.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
-
- if (poslistField != null) {
- TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
- } else {
- TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- }
- return true;
- }
-
- @After
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/TestUtils.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/TestUtils.java
deleted file mode 100644
index ca7755e..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/TestUtils.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.pregelix.graphbuilding;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.util.ArrayList;
-import java.util.Collections;
-
-public class TestUtils {
- /**
- * Compare with the sorted expected file.
- * The actual file may not be sorted;
- *
- * @param expectedFile
- * @param actualFile
- */
- public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception {
- BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
- BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
- ArrayList<String> actualLines = new ArrayList<String>();
- String lineExpected, lineActual;
- try {
- while ((lineActual = readerActual.readLine()) != null) {
- actualLines.add(lineActual);
- }
- Collections.sort(actualLines);
- int num = 1;
- for (String actualLine : actualLines) {
- lineExpected = readerExpected.readLine();
- if (lineExpected == null) {
- throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
- }
- if (!equalStrings(lineExpected, actualLine)) {
- throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
- + actualLine);
- }
- ++num;
- }
- lineExpected = readerExpected.readLine();
- if (lineExpected != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
- }
- } finally {
- readerActual.close();
- readerExpected.close();
- }
- }
-
- public static void compareWithUnSortedPosition(File expectedFile, File actualFile, int[] poslistField)
- throws Exception {
- BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
- BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
- ArrayList<String> actualLines = new ArrayList<String>();
- String lineExpected, lineActual;
- try {
- while ((lineActual = readerActual.readLine()) != null) {
- actualLines.add(lineActual);
- }
- Collections.sort(actualLines);
- int num = 1;
- for (String actualLine : actualLines) {
- lineExpected = readerExpected.readLine();
- if (lineExpected == null) {
- throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
- }
- if (!containStrings(lineExpected, actualLine, poslistField)) {
- throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
- + actualLine);
- }
- ++num;
- }
- lineExpected = readerExpected.readLine();
- if (lineExpected != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
- }
- } finally {
- readerActual.close();
- readerExpected.close();
- }
- }
-
- public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
- BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
- BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
- String lineExpected, lineActual;
- int num = 1;
- try {
- while ((lineExpected = readerExpected.readLine()) != null) {
- lineActual = readerActual.readLine();
- // Assert.assertEquals(lineExpected, lineActual);
- if (lineActual == null) {
- throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
- }
- if (!equalStrings(lineExpected, lineActual)) {
- throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
- + lineActual);
- }
- ++num;
- }
- lineActual = readerActual.readLine();
- if (lineActual != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
- }
- } finally {
- readerExpected.close();
- readerActual.close();
- }
- }
-
- private static boolean equalStrings(String s1, String s2) {
- String[] rowsOne = s1.split("\n");
- String[] rowsTwo = s2.split("\n");
-
- if (rowsOne.length != rowsTwo.length)
- return false;
-
- for (int i = 0; i < rowsOne.length; i++) {
- String row1 = rowsOne[i];
- String row2 = rowsTwo[i];
-
- if (row1.equals(row2))
- continue;
-
- String[] fields1 = row1.split(",");
- String[] fields2 = row2.split(",");
-
- for (int j = 0; j < fields1.length; j++) {
- if (fields1[j].equals(fields2[j])) {
- continue;
- } else if (fields1[j].indexOf('.') < 0) {
- return false;
- } else {
- fields1[j] = fields1[j].split("=")[1];
- fields2[j] = fields2[j].split("=")[1];
- Double double1 = Double.parseDouble(fields1[j]);
- Double double2 = Double.parseDouble(fields2[j]);
- float float1 = (float) double1.doubleValue();
- float float2 = (float) double2.doubleValue();
-
- if (Math.abs(float1 - float2) == 0)
- continue;
- else {
- return false;
- }
- }
- }
- }
- return true;
- }
-
- private static boolean containStrings(String lineExpected, String actualLine, int[] poslistField) {
- if (lineExpected.equals(actualLine)) {
- return true;
- }
- String[] fieldsExp = lineExpected.split("\\\t");
- String[] fieldsAct = actualLine.split("\\\t");
- if (fieldsAct.length != fieldsExp.length) {
- return false;
- }
- for (int i = 0; i < fieldsAct.length; i++) {
- boolean cont = false;
- for (int x : poslistField) {
- if (i == x) {
- cont = true;
- break;
- }
- }
- if (cont) {
- continue;
- }
- if (!fieldsAct[i].equals(fieldsExp[i])) {
- return false;
- }
- }
-
- ArrayList<String> posExp = new ArrayList<String>();
- ArrayList<String> posAct = new ArrayList<String>();
-
- for (int x : poslistField) {
- String valueExp = lineExpected.split("\\\t")[x];
- for (int i = 1; i < valueExp.length() - 1;) {
- if (valueExp.charAt(i) == '(') {
- String str = "";
- i++;
- while (i < valueExp.length() - 1 && valueExp.charAt(i) != ')') {
- str += valueExp.charAt(i);
- i++;
- }
- posExp.add(str);
- }
- i++;
- }
- String valueAct = actualLine.split("\\\t")[x];
- for (int i = 1; i < valueAct.length() - 1;) {
- if (valueAct.charAt(i) == '(') {
- String str = "";
- i++;
- while (i < valueAct.length() - 1 && valueAct.charAt(i) != ')') {
- str += valueAct.charAt(i);
- i++;
- }
- posAct.add(str);
- }
- i++;
- }
-
- if (posExp.size() != posAct.size()) {
- return false;
- }
- Collections.sort(posExp);
- Collections.sort(posAct);
- for (int i = 0; i < posExp.size(); i++) {
- if (!posExp.get(i).equals(posAct.get(i))) {
- return false;
- }
- }
- }
- return true;
- }
-}