format the code
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 322af31..e35f10c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,9 +10,8 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.pregelix.operator.ThreeStepLogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.TwoStepLogAlgorithmForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -43,9 +42,6 @@
@Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
public String profiling = "false";
-
- //@Option(name = "-filter-kmer", usage = "whether to do runtime profifling", required = false)
- //public String filterKmer = "";
}
public static void run(String[] args, PregelixJob job) throws Exception {
@@ -65,15 +61,11 @@
FileInputFormat.addInputPaths(job, inputs[0]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
- job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
- job.getConfiguration().setInt(ThreeStepLogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
if (options.numIteration > 0){
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
- job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
- job.getConfiguration().setInt(ThreeStepLogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
}
- //job.getConfiguration().set(NaiveFilterVertex.FILTERKMER, options.filterKmer);
return options;
}
-
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index a002179..43759a6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -6,7 +6,7 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.operator.TwoStepLogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -26,8 +26,8 @@
private byte checkMessage;
public LogAlgorithmMessageWritable(){
- sourceVertexId = new VKmerBytesWritable(TwoStepLogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(TwoStepLogAlgorithmForPathMergeVertex.kmerSize);
+ sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
adjMap = 0;
message = 0;
checkMessage = 0;
@@ -52,7 +52,7 @@
public void reset(){
checkMessage = 0;
- chainVertexId.reset(TwoStepLogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
adjMap = (byte)0;
message = 0;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
index 55a626d..6228d0c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
@@ -7,6 +7,8 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -14,35 +16,56 @@
/**
* sourceVertexId stores source vertexId when headVertex sends the message
* stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA
* file stores the point to the file that stores the chains of connected DNA
*/
private KmerBytesWritable sourceVertexId;
- private VKmerBytesWritable chainVertexId;
- private KmerBytesWritable headVertexId;
private byte adjMap;
- private boolean isRear;
+ private byte lastGeneCode;
+ private byte message;
+
+ private byte checkMessage;
public NaiveAlgorithmMessageWritable(){
sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
- headVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ adjMap = (byte)0;
+ lastGeneCode = (byte)0;
+ message = Message.NON;
+ checkMessage = (byte)0;
}
- public void set(KmerBytesWritable sourceVertex, VKmerBytesWritable chainVertex, KmerBytesWritable headVertex , byte adjMap, boolean isRear){
- this.sourceVertexId.set(sourceVertex);
- this.chainVertexId.set(chainVertex);
- this.headVertexId.set(headVertex);
- this.adjMap = adjMap;
- this.isRear = isRear;
+ public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, byte message){
+ checkMessage = 0;
+ if(sourceVertexId != null){
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId);
+ }
+ if(adjMap != 0){
+ checkMessage |= CheckMessage.ADJMAP;
+ this.adjMap = adjMap;
+ }
+ if(lastGeneCode != 0){
+ checkMessage |= CheckMessage.LASTGENECODE;
+ this.lastGeneCode = lastGeneCode;
+ }
+ this.message = message;
+ }
+
+ public void reset(){
+ checkMessage = 0;
+ adjMap = (byte)0;
+ lastGeneCode = (byte)0;
+ message = Message.NON;
}
public KmerBytesWritable getSourceVertexId() {
return sourceVertexId;
}
- public void setSourceVertexId(KmerBytesWritable source) {
- this.sourceVertexId.set(source);
+ public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
+ if(sourceVertexId != null){
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId);
+ }
}
public byte getAdjMap() {
@@ -50,76 +73,75 @@
}
public void setAdjMap(byte adjMap) {
- this.adjMap = adjMap;
+ if(adjMap != 0){
+ checkMessage |= CheckMessage.ADJMAP;
+ this.adjMap = adjMap;
+ }
}
- public void setChainVertexId(VKmerBytesWritable chainVertex) {
- this.chainVertexId.set(chainVertex);
+ public byte getLastGeneCode() {
+ return lastGeneCode;
}
- public VKmerBytesWritable getChainVertexId() {
- return chainVertexId;
+ public void setLastGeneCode(byte lastGeneCode) {
+ if(lastGeneCode != 0){
+ checkMessage |= CheckMessage.LASTGENECODE;
+ this.lastGeneCode = lastGeneCode;
+ }
}
- public boolean isRear() {
- return isRear;
+ public byte getMessage() {
+ return message;
}
- public void setRear(boolean isRear) {
- this.isRear = isRear;
- }
-
- public int getLengthOfChain() {
- return this.chainVertexId.getKmerLength();
- }
-
-
- public KmerBytesWritable getHeadVertexId() {
- return headVertexId;
- }
-
- public void setHeadVertexId(KmerBytesWritable headVertexId) {
- this.headVertexId.set(headVertexId);
+ public void setMessage(byte message) {
+ this.message = message;
}
@Override
public void write(DataOutput out) throws IOException {
- sourceVertexId.write(out);
- headVertexId.write(out);
- chainVertexId.write(out);
- out.write(adjMap);
- out.writeBoolean(isRear);
+ out.writeByte(checkMessage);
+ if((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.write(out);
+ if((checkMessage & CheckMessage.ADJMAP) != 0)
+ out.write(adjMap);
+ if((checkMessage & CheckMessage.LASTGENECODE) != 0)
+ out.write(lastGeneCode);
+ out.write(message);
}
@Override
public void readFields(DataInput in) throws IOException {
- sourceVertexId.readFields(in);
- headVertexId.readFields(in);
- chainVertexId.readFields(in);
- adjMap = in.readByte();
- isRear = in.readBoolean();
+ this.reset();
+ checkMessage = in.readByte();
+ if((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.readFields(in);
+ if((checkMessage & CheckMessage.ADJMAP) != 0)
+ adjMap = in.readByte();
+ if((checkMessage & CheckMessage.LASTGENECODE) != 0)
+ lastGeneCode = in.readByte();
+ message = in.readByte();
}
@Override
public int hashCode() {
- return chainVertexId.hashCode();
+ return sourceVertexId.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof NaiveAlgorithmMessageWritable) {
NaiveAlgorithmMessageWritable tp = (NaiveAlgorithmMessageWritable) o;
- return chainVertexId.equals( tp.chainVertexId);
+ return sourceVertexId.equals( tp.sourceVertexId);
}
return false;
}
@Override
public String toString() {
- return chainVertexId.toString();
+ return sourceVertexId.toString();
}
@Override
public int compareTo(NaiveAlgorithmMessageWritable tp) {
- return chainVertexId.compareTo(tp.chainVertexId);
+ return sourceVertexId.compareTo(tp.sourceVertexId);
}
-
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
index 332d6d0..5d40431 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
@@ -2,7 +2,6 @@
import java.util.logging.*;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class NaiveAlgorithmLogFormatter extends Formatter {
@@ -13,21 +12,17 @@
private long step;
private VKmerBytesWritable sourceVertexId;
private VKmerBytesWritable destVertexId;
- private NaiveAlgorithmMessageWritable msg;
public void set(long step, VKmerBytesWritable sourceVertexId,
- VKmerBytesWritable destVertexId, NaiveAlgorithmMessageWritable msg){
+ VKmerBytesWritable destVertexId){
this.step = step;
this.sourceVertexId.set(sourceVertexId);
this.destVertexId.set(destVertexId);
- this.msg = msg;
}
public String format(LogRecord record) {
StringBuilder builder = new StringBuilder(1000);
String source = sourceVertexId.toString();
- String chain = "";
-
builder.append("Step: " + step + "\r\n");
builder.append("Source Code: " + source + "\r\n");
@@ -36,11 +31,6 @@
String dest = destVertexId.toString();
builder.append("Destination Code: " + dest + "\r\n");
}
- if(msg.getLengthOfChain() != 0){
- chain = msg.getChainVertexId().toString();
- builder.append("Chain Message: " + chain + "\r\n");
- builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
- }
if(!formatMessage(record).equals(""))
builder.append(formatMessage(record) + "\r\n");
builder.append("\n");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
new file mode 100644
index 0000000..a12c852
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -0,0 +1,274 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ValueStateWritable
+ * edgeValue: NullWritable
+ * message: LogAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class LogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+ public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "LogAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private LogAlgorithmMessageWritable incomingMsg = new LogAlgorithmMessageWritable();
+ private LogAlgorithmMessageWritable outgoingMsg = new LogAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ outgoingMsg.reset();
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
+ return getDestVertexIdFromChain(chainVertexId, adjMap);
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
+ VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
+ return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ sendMsg(getNextDestVertexId(vertexId, x), outgoingMsg);
+ }
+ }
+ }
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
+ sendMsg(getPreDestVertexId(vertexId, x), outgoingMsg);
+ }
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.START);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.END);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ }
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+ /**
+ * set vertex state
+ */
+ public void setState(){
+ if(incomingMsg.getMessage() == Message.START){
+ getVertexValue().setState(State.START_VERTEX);
+ getVertexValue().setMergeChain(null);
+ }
+ else if(incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX){
+ getVertexValue().setState(State.END_VERTEX);
+ getVertexValue().setMergeChain(getVertexId());
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ /**
+ * head send message to path
+ */
+ public void sendOutMsg(KmerBytesWritable chainVertexId, byte adjMap){
+ if(getVertexValue().getState() == State.START_VERTEX){
+ outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ }
+ else if(getVertexValue().getState() != State.END_VERTEX){
+ outgoingMsg.setMessage(Message.NON);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ }
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(getSuperstep() == 3){
+ getVertexValue().setMergeChain(getVertexId());
+ sendOutMsg(getVertexId(), getVertexValue().getAdjMap());
+ }
+ else{
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ if(mergeChainVertex(msgIterator)){
+ if(incomingMsg.getMessage() == Message.END){
+ if(getVertexValue().getState() == State.START_VERTEX){
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ }
+ else
+ getVertexValue().setState(State.END_VERTEX);
+ }
+ else
+ sendOutMsg(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
+ }
+ }
+ }
+ }
+ /**
+ * path response message to head
+ */
+ public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ if(getVertexValue().getState() == State.END_VERTEX)
+ outgoingMsg.setMessage(Message.END);
+ sendMsg(incomingMsg.getSourceVertexId(),outgoingMsg);
+
+ if(incomingMsg.getMessage() == Message.START)
+ deleteVertex(getVertexId());
+ }
+ else{
+ if(getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX)
+ deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ }
+ }
+ /**
+ * merge chainVertex and store in vertexVal.chainVertexId
+ */
+ public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ //merge chain
+ lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+ incomingMsg.getChainVertexId()));
+ chainVertexId.set(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ lastKmer));
+ if(GraphVertexOperation.isCycle(getVertexId(), chainVertexId)){
+ getVertexValue().setMergeChain(null);
+ getVertexValue().setAdjMap(GraphVertexOperation.reverseAdjMap(getVertexValue().getAdjMap(),
+ chainVertexId.getGeneCodeAtPosition(kmerSize)));
+ getVertexValue().setState(State.CYCLE);
+ return false;
+ }
+ else
+ getVertexValue().setMergeChain(chainVertexId);
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(),
+ incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(tmpVertexValue);
+ return true;
+ }
+ @Override
+ public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if(getSuperstep() == 2)
+ initState(msgIterator);
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ sendMsgToPathVertex(msgIterator);
+ voteToHalt();
+ }
+ else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
+ responseMsgToHeadVertex(msgIterator);
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index 0df5e6b..78e4ba6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -15,6 +15,7 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
@@ -55,13 +56,11 @@
public static int kmerSize = -1;
private int maxIteration = -1;
- private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
+ private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
+ private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
-
/**
* initiate kmerSize, maxIteration
*/
@@ -70,9 +69,7 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- }
- public void findDestination(){
- destVertexId.set(msg.getSourceVertexId());
+ outgoingMsg.reset();
}
/**
* get destination vertex
@@ -80,9 +77,11 @@
public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode){
return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
}
-
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
- lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
}
/**
@@ -92,101 +91,131 @@
for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
if((adjMap & (1 << x)) != 0){
destVertexId.set(getDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
+ sendMsg(destVertexId, outgoingMsg);
}
}
}
/**
- * initiate chain vertex
+ * head send message to all previous nodes
*/
- public void initChainVertex(){
- if(!msg.isRear()){
- findDestination();
- if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- chainVertexId.set(getVertexId());
- msg.set(getVertexId(), chainVertexId, getVertexId(), getVertexValue().getAdjMap(), false);
- sendMsg(destVertexId,msg);
- }else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap()))
- voteToHalt();
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
+ destVertexId.set(getPreDestVertexId(vertexId, x));
+ sendMsg(destVertexId, outgoingMsg);
+ }
}
}
/**
+ * start sending message
+ */
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.START);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ }
+ if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.END);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ }
+ }
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<NaiveAlgorithmMessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+ /**
+ * set vertex state
+ */
+ public void setState(){
+ if(incomingMsg.getMessage() == Message.START){
+ getVertexValue().setState(State.START_VERTEX);
+ }
+ else if(incomingMsg.getMessage() == Message.END
+ && getVertexValue().getState() != State.START_VERTEX){
+ getVertexValue().setState(State.END_VERTEX);
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ /**
* head node sends message to path node
*/
- public void sendMsgToPathVertex(){
- if(!msg.isRear()){
- destVertexId.set(getDestVertexIdFromChain(msg.getChainVertexId(), msg.getAdjMap()));
- msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+ public void sendMsgToPathVertex(Iterator<NaiveAlgorithmMessageWritable> msgIterator){
+ if(getSuperstep() == 3){
+ getVertexValue().setMergeChain(getVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
+ getVertexValue().getAdjMap()));
+ sendMsg(destVertexId,outgoingMsg);
}else{
- destVertexId.set(msg.getHeadVertexId());
- msg.set(msg.getSourceVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+ while (msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() != Message.STOP){
+ getVertexValue().setMergeChain(kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
+ incomingMsg.getLastGeneCode()));
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
+ incomingMsg.getAdjMap()));
+ sendMsg(destVertexId,outgoingMsg);
+ }
+ else{
+ getVertexValue().setMergeChain(kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
+ incomingMsg.getLastGeneCode()));
+ byte adjMap = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(),
+ incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(adjMap);
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ }
+ }
}
- sendMsg(destVertexId,msg);
}
/**
* path node sends message back to head node
*/
public void responseMsgToHeadVertex(){
- if(!msg.isRear()){
- findDestination();
- if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
- getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- deleteVertex(getVertexId());
- msg.set(getVertexId(), chainVertexId, msg.getHeadVertexId(), getVertexValue().getAdjMap(), false);
- sendMsg(destVertexId,msg);
- }
- else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
- sendMsg(destVertexId,msg);
- }
- }else{// is Rear
- if(msg.getLengthOfChain() > kmerSize){
- byte tmp = GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getAdjMap(), msg.getSourceVertexId(), kmerSize);
- getVertexValue().set(tmp, State.FINAL_VERTEX, msg.getChainVertexId());
- setVertexValue(getVertexValue());
- //String source = msg.getChainVertexId().toString();
- //System.out.print("");
- }
- }
+ deleteVertex(getVertexId());
+ outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ if(getVertexValue().getState() == State.END_VERTEX)
+ outgoingMsg.setMessage(Message.STOP);
+ sendMsg(incomingMsg.getSourceVertexId(),outgoingMsg);
}
@Override
public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- msg.set(getVertexId(), chainVertexId, getVertexId(), (byte)0, false);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- }
-
+ startSendMsg();
+ voteToHalt();
}
- else if(getSuperstep() == 2){
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- initChainVertex();
- }
- }
- //head node sends message to path node
+ else if(getSuperstep() == 2)
+ initState(msgIterator);
else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- while (msgIterator.hasNext()){
- msg = msgIterator.next();
- sendMsgToPathVertex();
- }
+ sendMsgToPathVertex(msgIterator);
+ voteToHalt();
}
- //path node sends message back to head node
else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
while(msgIterator.hasNext()){
- msg = msgIterator.next();
+ incomingMsg = msgIterator.next();
responseMsgToHeadVertex();
}
+ voteToHalt();
}
- voteToHalt();
}
-
- /**
- * @param args
- */
public static void main(String[] args) throws Exception {
PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 3e494ba..209e5e2 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -10,7 +10,7 @@
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.TwoStepLogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -27,7 +27,7 @@
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 55);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -35,20 +35,20 @@
generateNaiveAlgorithmForMergeGraphJob("NaiveAlgorithmForMergeGraph", outputBase + "NaiveAlgorithmForMergeGraph.xml");
}
- private static void generateTwoStepLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(TwoStepLogAlgorithmForPathMergeVertex.class);
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
- private static void genTwoStepLogAlgorithmForMergeGraph() throws IOException {
- generateTwoStepLogAlgorithmForMergeGraphJob("TwoStepLogAlgorithmForMergeGraph", outputBase + "TwoStepLogAlgorithmForMergeGraph.xml");
+ private static void genLogAlgorithmForMergeGraph() throws IOException {
+ generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
}
/**
@@ -57,7 +57,7 @@
*/
public static void main(String[] args) throws IOException {
genNaiveAlgorithmForMergeGraph();
- genTwoStepLogAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestCase.java
new file mode 100644
index 0000000..1acf79b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestCase.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.pregelix.sequencefile.GenerateTextFile;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+public class PathMergeSmallTestCase extends TestCase {
+ private final PregelixJob job;
+ private final String resultFileDir;
+ private final String textFileDir;
+ private final String jobFile;
+ private final Driver driver = new Driver(this.getClass());
+ private final FileSystem dfs;
+
+ public PathMergeSmallTestCase(String hadoopConfPath, String jobName,
+ String jobFile, FileSystem dfs, String hdfsInput, String resultFile, String textFile)
+ throws Exception {
+ super("test");
+ this.jobFile = jobFile;
+ this.job = new PregelixJob("test");
+ this.job.getConfiguration().addResource(new Path(jobFile));
+ this.job.getConfiguration().addResource(new Path(hadoopConfPath));
+ FileInputFormat.setInputPaths(job, hdfsInput);
+ FileOutputFormat.setOutputPath(job, new Path(hdfsInput + "_result"));
+ this.textFileDir = textFile;
+ job.setJobName(jobName);
+ this.resultFileDir = resultFile;
+
+ this.dfs = dfs;
+ }
+
+ private void waitawhile() throws InterruptedException {
+ synchronized (this) {
+ this.wait(20);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ setUp();
+ Plan[] plans = new Plan[] { Plan.OUTER_JOIN };
+ for (Plan plan : plans) {
+ driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
+ PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT,
+ false);
+ }
+ compareResults();
+ tearDown();
+ waitawhile();
+ }
+
+ private void compareResults() throws Exception {
+ dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(
+ resultFileDir));
+ GenerateTextFile.generateFromPathmergeResult(5, resultFileDir, textFileDir);
+ // TestUtils.compareWithResultDir(new File(expectedFileDir), new
+ // File(resultFileDir));
+ }
+
+ public String toString() {
+ return jobFile;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
new file mode 100644
index 0000000..956a646
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class PathMergeSmallTestSuite extends TestSuite {
+ private static final Logger LOGGER = Logger
+ .getLogger(PathMergeSmallTestSuite.class.getName());
+
+ public static final String PreFix = "data/PathTestSet"; //"graphbuildresult";
+ public static final String[] TestDir = { PreFix + File.separator
+ //+ "split.aa"};
+ ///+ "split.aa"};/*, PreFix + File.separator
+ /*+ "CyclePath", PreFix + File.separator
+ + "SimplePath", PreFix + File.separator
+ + "SinglePath", PreFix + File.separator
+ + "TreePath"};*/
+ + "2", PreFix + File.separator
+ + "3", PreFix + File.separator
+ + "4", PreFix + File.separator
+ + "5", PreFix + File.separator
+ + "6", PreFix + File.separator
+ + "7", PreFix + File.separator
+ + "8", PreFix + File.separator
+ + "9", PreFix + File.separator
+ + "TwoKmer", PreFix + File.separator
+ + "ThreeKmer", PreFix + File.separator
+ + "SinglePath", PreFix + File.separator
+ + "SimplePath", PreFix + File.separator
+ + "Path", PreFix + File.separator
+ + "BridgePath", PreFix + File.separator
+ + "CyclePath", PreFix + File.separator
+ + "RingPath", PreFix + File.separator
+ + "LongPath", PreFix + File.separator
+ + "TreePath"};
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+
+ public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+ + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
+
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
+
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+
+ for (String testDir : TestDir) {
+ File src = new File(testDir);
+ Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+ dfs.mkdirs(dest);
+ //src.listFiles()
+ //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+ for (File f : src.listFiles()){
+ dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+ }
+ }
+
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
+
+ public static Test suite() throws Exception {
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ PathMergeSmallTestSuite testSuite = new PathMergeSmallTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
+
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+
+ for (File qFile : queries) {
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ for (String testPathStr : TestDir) {
+ File testDir = new File(testPathStr);
+ String resultFileName = ACTUAL_RESULT_DIR
+ + File.separator
+ + jobExtToResExt(qFile.getName())
+ + File.separator + "bin"
+ + File.separator + testDir.getName();
+ String textFileName = ACTUAL_RESULT_DIR
+ + File.separator
+ + jobExtToResExt(qFile.getName())
+ + File.separator + "txt"
+ + File.separator + testDir.getName();
+ testSuite.addTest(new PathMergeSmallTestCase(
+ HADOOP_CONF_PATH, qFile.getName(), qFile
+ .getAbsolutePath().toString(),
+ dfs, HDFS_INPUTPATH + File.separator + testDir.getName(),
+ resultFileName, textFileName));
+ }
+ }
+ }
+ }
+ return testSuite;
+ }
+
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected static List<String> getFileList(String ignorePath)
+ throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
+
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot);
+ }
+
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/ResultGen/ReportGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/ResultGen/ReportGenerator.java
index 3281a3b..00298a3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/ResultGen/ReportGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/ResultGen/ReportGenerator.java
@@ -10,8 +10,8 @@
import org.apache.commons.io.FileUtils;
public class ReportGenerator {
- public static final String PATH_TO_REPORT = "report/";
- public static final String PATH_TO_LOGINFO = "log/";
+ public static final String PATH_TO_REPORT = "report";
+ public static final String PATH_TO_LOGINFO = "log";
public static void generateReportFromLoginfo(String fileName) throws Exception {
DecimalFormat df = new DecimalFormat("0.00");
@@ -41,10 +41,10 @@
public static void main(String[] args) throws Exception {
FileUtils.forceMkdir(new File(PATH_TO_REPORT));
FileUtils.cleanDirectory(new File(PATH_TO_REPORT));
- generateReportFromLoginfo("naive");
- generateReportFromLoginfo("log");
- generateReportFromLoginfo("log_yourkit");
+ generateReportFromLoginfo("naive_converge");
+ generateReportFromLoginfo("log_converge");
generateReportFromLoginfo("naive_36");
generateReportFromLoginfo("log_13");
+
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
index 5681ae0..6d4e421 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
@@ -30,6 +30,7 @@
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
+@SuppressWarnings("deprecation")
public class GraphBuildTest {
private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
@@ -128,14 +129,12 @@
private boolean checkResults(String expectedPath) throws Exception {
- File dumped = null;
String format = conf.get(GenomixJob.OUTPUT_FORMAT);
if ("text".equalsIgnoreCase(format)) {
FileUtil.copyMerge(FileSystem.get(conf),
new Path(HDFS_OUTPUT_PATH), FileSystem
.getLocal(new Configuration()), new Path(
DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
} else {
FileSystem.getLocal(new Configuration()).mkdirs(
@@ -173,7 +172,6 @@
reader.close();
}
bw.close();
- dumped = new File(CONVERT_RESULT);
}
// TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
index 54cecbc..afd2477 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
@@ -20,13 +20,13 @@
public class MergePathTest {
public static final String PATH_TO_TESTSTORE = "testcase/pathmerge/";
//"genomix_result/pathmerge/new_naive";
- public static final String NAIVE_DATA_INPUT = "genomix_result/pathmerge/naive_newest";//"actual/NaiveAlgorithmForMergeGraph/BinaryOutput/test";
+ public static final String NAIVE_DATA_INPUT = "genomix_result/P1_nc8";//"actual/NaiveAlgorithmForMergeGraph/BinaryOutput/test";
//"genomix_result/pathmerge/new_log";
- public static final String LOG_DATA_INPUT = "genomix_result/pathmerge/log_newest";//"actual/TwoStepLogAlgorithmForMergeGraph/BinaryOutput/test";
+ public static final String LOG_DATA_INPUT = "genomix_result/P2_nc8";//"actual/TwoStepLogAlgorithmForMergeGraph/BinaryOutput/test";
public static final String TEXT_OUTPUT = PATH_TO_TESTSTORE + "textfile";
public static final String CHAIN_OUTPUT = PATH_TO_TESTSTORE + "chain";
- private static int nc = 4;
+ private static int nc = 8;
private static int kmerSize = 55;
private static int maxLength = 102;
@@ -58,18 +58,26 @@
if (key == null || value == null){
break;
}
- if(value.getLengthOfMergeChain() != 0
- && value.getLengthOfMergeChain() != -1
- && value.getState() == State.FINAL_VERTEX){
+ //if(value.getState() == State.FINAL_VERTEX){
+ /*bw.write(value.getMergeChain().toString()
+ + "\t" + GeneCode.getSymbolFromBitMap(value.getAdjMap()));
+ bw.newLine();*/
+ bw.write(key.toString()
+ + "\t" + value.toString());
+ bw.newLine();
+ //}
+ //if(value.getLengthOfMergeChain() != 0
+ // && value.getLengthOfMergeChain() != -1
+ // && value.getState() == State.FINAL_VERTEX){
//bw.write(key.toString() + "\t" +
// value.toString());
- bw.write(value.getLengthOfMergeChain() + "\t" +
- value.getMergeChain().toString() + "\t" +
- GeneCode.getSymbolFromBitMap(value.getAdjMap()) + "\t" +
- value.getState());
- //+ "\t" + key.toString());
- bw.newLine();
- }
+ //bw.write(value.getLengthOfMergeChain() + "\t" +
+ // value.getMergeChain().toString() + "\t" +
+ // GeneCode.getSymbolFromBitMap(value.getAdjMap()) + "\t" +
+ // key.toString());
+ //value.getState());
+
+ //}
}
reader.close();
}
diff --git a/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties b/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
index 94eb599..0c6abd1 100644
--- a/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
+++ b/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
@@ -20,7 +20,7 @@
NCLOGS_DIR=$NCTMP_DIR/logs
#Comma separated I/O directories for the spilling of external sort
-IO_DIRS="/tmp/t3,/tmp/t4"
+IO_DIRS="/tmp/t3,/tmp/t4,/tmp/t5,/tmp/t6"
#The JAVA_HOME
JAVA_HOME=$JAVA_HOME
@@ -33,5 +33,5 @@
# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
#NC JAVA_OPTS
-NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=genomix-pregelix/src/test/resources/logging.properties"
diff --git a/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties b/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
index 04732be..2daf1ee 100644
--- a/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
+++ b/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
@@ -1 +1 @@
-store=teststore1,teststore2
\ No newline at end of file
+store=teststore1,teststore2,teststore3,teststore4,
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/logging.properties b/genomix/genomix-pregelix/src/test/resources/logging.properties
index b8f2be9..0ed3dfc 100644
--- a/genomix/genomix-pregelix/src/test/resources/logging.properties
+++ b/genomix/genomix-pregelix/src/test/resources/logging.properties
@@ -60,6 +60,7 @@
# For example, set the com.xyz.foo logger to only log SEVERE
# messages:
+edu.uci.ics.genomix.pregelix = INFO
#edu.uci.ics.asterix.level = FINE
#edu.uci.ics.algebricks.level = FINE
edu.uci.ics.hyracks.level = SEVERE