update
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3133 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
index 7cf8711..f203297 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
@@ -61,17 +61,6 @@
/**
* set the src vertex id
*/
- /*vertexId = getRecordReader().getCurrentKey();
- byte[] vertexBytes = vertexId.getBytes();
- int numOfByte = (2*GraphVertexOperation.k-1)/8 + 1;
- if(vertexBytes.length == numOfByte)
- vertex.setVertexId(vertexId);
- else{
- byte[] tmp = new byte[numOfByte];
- for(int i = 0; i < numOfByte; i++)
- tmp[i] = vertexBytes[i];
- vertex.setVertexId(new BytesWritable(tmp));
- }*/
vertexId = getRecordReader().getCurrentKey();
vertex.setVertexId(vertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
index 9260a2f..47b740b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -17,7 +17,9 @@
import edu.uci.ics.pregelix.SequenceFile.GenerateSequenceFile;
import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
+import edu.uci.ics.pregelix.example.io.LogAlgorithmMessageWritable;
import edu.uci.ics.pregelix.example.io.MessageWritable;
+import edu.uci.ics.pregelix.example.io.ValueStateWritable;
import edu.uci.ics.pregelix.hdfs.HDFSOperation;
import edu.uci.ics.pregelix.type.KmerCountValue;
@@ -170,6 +172,41 @@
}
}
/**
+ * check what kind of precursor node
+ * return 0:A 1:C 2:G 3:T 4:nothing
+ */
+ public static int findPrecursorNode(byte vertexValue){
+ String firstBit = "00010000"; //A
+ String secondBit = "00100000"; //C
+ String thirdBit = "01000000"; //G
+ String fourthBit = "10000000"; //T
+ int first = BitwiseOperation.convertBinaryStringToByte(firstBit) & 0xff;
+ int second = BitwiseOperation.convertBinaryStringToByte(secondBit) & 0xff;
+ int third = BitwiseOperation.convertBinaryStringToByte(thirdBit) & 0xff;
+ int fourth = BitwiseOperation.convertBinaryStringToByte(fourthBit) & 0xff;
+ int value = vertexValue & 0xff;
+ int tmp = value & first;
+ if(tmp != 0)
+ return 0;
+ else{
+ tmp = value & second;
+ if(tmp != 0)
+ return 1;
+ else{
+ tmp = value & third;
+ if(tmp != 0)
+ return 2;
+ else{
+ tmp = value & fourth;
+ if(tmp != 0)
+ return 3;
+ else
+ return 4;
+ }
+ }
+ }
+ }
+ /**
* replace last two bits based on n
* Ex. 01 10 00(nothing) -> 01 10 00(A)/01(C)/10(G)/11(T)
*/
@@ -198,13 +235,47 @@
return BitwiseOperation.convertBinaryStringToBytes(resultString);
}
/**
- * find the vertexId of the destination node
+ * replace first two bits based on n
+ * Ex. 01 10 00(nothing) -> 00(A)/01(C)/10(G)/11(T) 10 00
+ */
+ public static byte[] replaceFirstTwoBits(byte[] vertexId, int n){
+ String binaryStringVertexId = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, k);
+ String resultString = "";
+ switch(n){
+ case 0:
+ resultString += "00";
+ break;
+ case 1:
+ resultString += "01";
+ break;
+ case 2:
+ resultString += "10";
+ break;
+ case 3:
+ resultString += "11";
+ break;
+ default:
+ break;
+ }
+ for(int i = 2; i < binaryStringVertexId.length(); i++)
+ resultString += binaryStringVertexId.charAt(i);
+ return BitwiseOperation.convertBinaryStringToBytes(resultString);
+ }
+ /**
+ * find the vertexId of the destination node - left neighber
*/
public static byte[] getDestVertexId(byte[] sourceVertexId, byte vertexValue){
byte[] destVertexId = BitwiseOperation.shiftBitsLeft(sourceVertexId, 2);
return replaceLastTwoBits(destVertexId, findSucceedNode(vertexValue));
}
/**
+ * find the vertexId of the destination node - right neighber
+ */
+ public static byte[] getLeftDestVertexId(byte[] sourceVertexId, byte vertexValue){
+ byte[] destVertexId = BitwiseOperation.shiftBitsRight(sourceVertexId, 2);
+ return replaceFirstTwoBits(destVertexId, findPrecursorNode(vertexValue));
+ }
+ /**
* update the chain vertexId
*/
public static byte[] updateChainVertexId(byte[] chainVertexId, int lengthOfChainVertex, byte[] newVertexId){
@@ -234,8 +305,9 @@
/**
* merge two BytesWritable. Ex. merge two vertexId
*/
- public static BytesWritable mergeTwoChainVertex(byte[] b1, byte[] b2, int length){
- return new BytesWritable(BitwiseOperation.mergeTwoBytesArray(b1, length, b2, length));
+ public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2){
+ String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*k);
+ return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), 1);
}
/**
* update right neighber
@@ -395,6 +467,42 @@
return;
}
/**
+ * output test for message communication
+ */
+ public static void testMessageCommunication2(OutputStreamWriter writer, long step, byte[] tmpSourceVertextId,
+ byte[] tmpDestVertexId, LogAlgorithmMessageWritable tmpMsg, byte[] myownId){
+ //test
+ String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(
+ tmpSourceVertextId,GraphVertexOperation.k);
+ try {
+ writer.write("Step: " + step + "\r\n");
+ writer.write("Source Key: " + kmer + "\r\n");
+
+ writer.write("Source Code: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(kmer) + "\r\n");
+ writer.write("Send Message to: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryStringKmer(
+ tmpDestVertexId,GraphVertexOperation.k)) + "\r\n");
+ if(tmpMsg.getLengthOfChain() != 0){
+ writer.write("Chain Message: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryString(
+ tmpMsg.getChainVertexId())) + "\r\n");
+ writer.write("Chain Length: " + tmpMsg.getLengthOfChain() + "\r\n");
+ }
+ if(myownId != null)
+ writer.write("My own Id is: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryStringKmer(
+ myownId,GraphVertexOperation.k)) + "\r\n");
+ if(tmpMsg.getMessage() != 0)
+ writer.write("Message is: " + tmpMsg.getMessage() + "\r\n");
+ writer.write("\r\n");
+ } catch (IOException e) { e.printStackTrace(); }
+ return;
+ }
+ /**
* output test for last message communication -- flush
*/
public static void testLastMessageCommunication(OutputStreamWriter writer, long step, byte[] tmpVertextId,
@@ -420,5 +528,88 @@
e.printStackTrace();
}
}
+ /**
+ * output test for log message communication
+ */
+ public static void testLogMessageCommunication(OutputStreamWriter writer, long step, byte[] tmpSourceVertextId,
+ byte[] tmpDestVertexId, LogAlgorithmMessageWritable tmpMsg){
+ //test
+ String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(
+ tmpSourceVertextId,GraphVertexOperation.k);
+ try {
+ writer.write("Step: " + step + "\r\n");
+ writer.write("Source Key: " + kmer + "\r\n");
+ writer.write("Source Code: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(kmer) + "\r\n");
+ writer.write("Send Message to: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryStringKmer(
+ tmpDestVertexId,GraphVertexOperation.k)) + "\r\n");
+ writer.write("Message is: " +
+ tmpMsg.getMessage() + "\r\n");
+ writer.write("\r\n");
+ } catch (IOException e) { e.printStackTrace(); }
+ return;
+ }
+ /**
+ * test set vertex state
+ */
+ public static void testSetVertexState(OutputStreamWriter writer, long step,byte[] tmpSourceVertextId,
+ byte[] tmpDestVertexId, LogAlgorithmMessageWritable tmpMsg, ValueStateWritable tmpVal){
+ //test
+ String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(
+ tmpSourceVertextId,GraphVertexOperation.k);
+ try {
+ writer.write("Step: " + step + "\r\n");
+ writer.write("Source Key: " + kmer + "\r\n");
+
+ writer.write("Source Code: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(kmer) + "\r\n");
+ if(tmpDestVertexId != null && tmpMsg != null){
+ writer.write("Send Message to: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryStringKmer(
+ tmpDestVertexId,GraphVertexOperation.k)) + "\r\n");
+ writer.write("Message is: " +
+ tmpMsg.getMessage() + "\r\n");
+ }
+ writer.write("Set vertex state to " +
+ tmpVal.getState() + "\r\n");
+ writer.write("\r\n");
+
+ } catch (IOException e) { e.printStackTrace(); }
+ return;
+ }
+ /**
+ * test delete vertex information
+ */
+ public static void testDeleteVertexInfo(OutputStreamWriter writer, long step, byte[] vertexId, String reason){
+ try {
+ writer.write("Step: " + step + "\r\n");
+ writer.write(reason + "\r\n");
+ writer.write("delete " + BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, GraphVertexOperation.k)
+ + "\t" + GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryStringKmer(
+ vertexId,GraphVertexOperation.k)) + "\r\n");
+ writer.write("\r\n");
+ } catch (IOException e) { e.printStackTrace(); }
+ return;
+ }
+ /**
+ * test merge chain vertex
+ */
+ public static void testMergeChainVertex(OutputStreamWriter writer, long step, byte[] mergeChain,
+ int lengthOfChain){
+ try {
+ writer.write("Step: " + step + "\r\n");
+ writer.write("Merge Chain: " +
+ GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryString(
+ mergeChain)) + "\r\n");
+ writer.write("Chain Length: " + lengthOfChain + "\r\n");
+ writer.write("\r\n");
+ } catch (IOException e) { e.printStackTrace(); }
+ return;
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphInputFormat.java
new file mode 100644
index 0000000..a6e4a6c
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphInputFormat.java
@@ -0,0 +1,87 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
+import edu.uci.ics.pregelix.example.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.pregelix.example.io.ValueStateWritable;
+import edu.uci.ics.pregelix.type.KmerCountValue;
+import edu.uci.ics.pregelix.type.State;
+
+public class LogAlgorithmForMergeGraphInputFormat extends
+ BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+
+ /**
+ * Format INPUT
+ */
+ @Override
+ public VertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+ }
+
+ @SuppressWarnings("rawtypes")
+ class BinaryLoadGraphReader extends
+ BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ private final static String separator = " ";
+ private Vertex vertex;
+ private BytesWritable vertexId = new BytesWritable();
+ private ValueStateWritable vertexValue = new ValueStateWritable();
+
+ public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
+ super(recordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+
+ if(getRecordReader() != null){
+ /**
+ * set the src vertex id
+ */
+
+ vertexId = getRecordReader().getCurrentKey();
+ vertex.setVertexId(vertexId);
+ /**
+ * set the vertex value
+ */
+ KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
+ vertexValue.setValue(kmerCountValue.getAdjBitMap());
+ vertexValue.setState(State.NON_VERTEX);
+ vertex.setVertexValue(vertexValue);
+
+ String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId.getBytes(),GraphVertexOperation.k);
+ System.out.println("key: " + kmer);
+ System.out.println("code: " + GraphVertexOperation.convertBinaryStringToGenecode(kmer));
+ System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(kmerCountValue.getAdjBitMap()));
+ System.out.println();
+ }
+
+ return vertex;
+ }
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
new file mode 100644
index 0000000..9d4672a
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.pregelix.example.io.ValueStateWritable;
+
+public class LogAlgorithmForMergeGraphOutputFormat extends
+ BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<BytesWritable, ByteWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(vertex.getVertexId(),
+ new ByteWritable(vertex.getVertexValue().getValue()));
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
index 3561550..68c201b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -1,6 +1,9 @@
package edu.uci.ics.pregelix;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
@@ -14,6 +17,7 @@
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
import edu.uci.ics.pregelix.example.client.Client;
import edu.uci.ics.pregelix.example.io.LogAlgorithmMessageWritable;
@@ -23,9 +27,9 @@
/*
* vertexId: BytesWritable
- * vertexValue: ByteWritable
+ * vertexValue: ValueStateWritable
* edgeValue: NullWritable
- * message: MessageWritable
+ * message: LogAlgorithmMessageWritable
*
* DNA:
* A: 00
@@ -54,35 +58,52 @@
private byte[] tmpSourceVertextId;
private byte[] tmpDestVertexId;
private byte[] tmpChainVertexId;
+ private byte[] mergeChainVertexId;
+ private int lengthOfMergeChainVertex;
private byte tmpVertexValue;
private int tmpVertexState;
private int tmpMessage;
private ValueStateWritable tmpVal = new ValueStateWritable();
private LogAlgorithmMessageWritable tmpMsg = new LogAlgorithmMessageWritable();
+ OutputStreamWriter writer;
/**
- * For test, in compute method, make each vertexValue shift 1 to left.
- * It will be modified when going forward to next step.
+ * Log Algorithm for path merge graph
*/
@Override
public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ try {
+ writer = new OutputStreamWriter(new FileOutputStream("test/check",true));
+ } catch (FileNotFoundException e1) { e1.printStackTrace();}
if (getSuperstep() == 1) {
tmpVal = getVertexValue();
tmpVertexValue = tmpVal.getValue();
+ tmpChainVertexId = new byte[0];
+ tmpMsg.setChainVertexId(tmpChainVertexId);
if(GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))){
tmpMsg.setMessage(Message.START);
tmpDestVertexId = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), tmpVertexValue);
sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ //test
+ GraphVertexOperation.testLogMessageCommunication(writer, getSuperstep(),
+ getVertexId().getBytes(), tmpDestVertexId, tmpMsg);
voteToHalt();
}
else if(GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
tmpMsg.setMessage(Message.END);
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), tmpVertexValue);
+ tmpDestVertexId = GraphVertexOperation.getLeftDestVertexId(getVertexId().getBytes(), tmpVertexValue);
sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ //test
+ GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(),
+ tmpDestVertexId, tmpMsg, tmpVal);
voteToHalt();
}
else if(GraphVertexOperation.isPathVertex(new ByteWritable(tmpVertexValue))){
+ tmpVal = getVertexValue();
tmpVal.setState(State.MID_VERTEX);
setVertexValue(tmpVal);
+ //test
+ GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(),
+ null, null, tmpVal);
}
else
voteToHalt();
@@ -92,32 +113,65 @@
tmpMsg = msgIterator.next();
tmpMessage = tmpMsg.getMessage();
tmpVertexState = getVertexValue().getState();
+ tmpVal = getVertexValue();
if(tmpMessage == Message.START && tmpVertexState == State.MID_VERTEX){
tmpVal.setState(State.START_VERTEX);
setVertexValue(tmpVal);
+ //test
+ GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(),
+ null, null, tmpVal);
}
else if(tmpMessage == Message.END && tmpVertexState == State.MID_VERTEX){
tmpVal.setState(State.END_VERTEX);
setVertexValue(tmpVal);
+ //test
+ GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(),
+ null, null, tmpVal);
}
}
}
//head node sends message to path node
else if(getSuperstep()%3 == 0){
- tmpVertexState = getVertexValue().getState();
- tmpSourceVertextId = getVertexId().getBytes();
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
- getVertexValue().getValue());
- if(tmpVertexState == State.START_VERTEX){
- tmpMsg.setMessage(Message.START);
- tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- }
- else if(tmpVertexState == State.END_VERTEX){
- tmpMsg.setMessage(Message.NON);
- tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- }
+ tmpVal = getVertexValue();
+ tmpVertexValue = tmpVal.getValue();
+ //if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
+ // && !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
+ if(msgIterator.hasNext())
+ tmpMsg = msgIterator.next();
+ else
+ tmpMsg = new LogAlgorithmMessageWritable();
+ tmpVertexState = getVertexValue().getState();
+ tmpSourceVertextId = getVertexId().getBytes();
+ if(lengthOfMergeChainVertex == 0)
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
+ getVertexValue().getValue());
+ else{
+ byte[] lastKmer = GraphVertexOperation.getLastKmer(mergeChainVertexId,
+ lengthOfMergeChainVertex);
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, getVertexValue().getValue());
+ }
+
+ /*if(tmpMsg.getLengthOfChain()== 0){
+ tmpMsg.setLengthOfChain(GraphVertexOperation.k);
+ tmpMsg.setChainVertexId(getVertexId().getBytes());
+ }*/
+ if(tmpVertexState == State.START_VERTEX){
+ tmpMsg.setMessage(Message.START);
+ tmpMsg.setSourceVertexId(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ //test
+ GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+ tmpDestVertexId, tmpMsg, null);
+ }
+ else if(tmpVertexState != State.END_VERTEX){
+ tmpMsg.setMessage(Message.NON);
+ tmpMsg.setSourceVertexId(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ //test
+ GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+ tmpDestVertexId, tmpMsg, null);
+ }
+ //}
}
//path node sends message back to head node
else if(getSuperstep()%3 == 1){
@@ -125,32 +179,59 @@
tmpVal = getVertexValue();
tmpMsg = msgIterator.next();
tmpMessage = tmpMsg.getMessage();
- tmpSourceVertextId = getVertexId().getBytes();
- tmpMsg.setChainVertexId(tmpSourceVertextId);
- byte[] tmpBytes = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
- tmpVal.getValue());
- tmpMsg.setSourceVertexIdOrNeighberInfo(tmpBytes); //set neighber
+ tmpSourceVertextId = tmpMsg.getSourceVertexId();
+ tmpChainVertexId = tmpMsg.getChainVertexId();
+ //byte[] tmpBytes = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
+ // tmpVal.getValue());
+ tmpMsg.setSourceVertexId(getVertexId().getBytes());
+ tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
tmpMsg.setSourceVertexState(tmpVal.getState());
+ /*tmpMsg.incrementLength();
+ tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(
+ tmpChainVertexId,
+ tmpMsg.getLengthOfChain()-1,
+ getVertexId().getBytes()));*/
sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+ //test
+ GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+ tmpSourceVertextId, tmpMsg, tmpSourceVertextId);
//kill Message because it has been merged by the head
if(tmpMessage == Message.START){
tmpVal.setState(State.TODELETE);
setVertexValue(tmpVal);
}
}
- else
- deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf because it doesn't receive any message
+ else{
+ if(!GraphVertexOperation.isHead(new ByteWritable(getVertexValue().getValue()))
+ && !GraphVertexOperation.isRear(new ByteWritable(getVertexValue().getValue()))
+ && getVertexValue().getState() != State.START_VERTEX){
+
+ GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "not receive any message");
+ deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
+ }
+ }
}
else if(getSuperstep()%3 == 2){
if(msgIterator.hasNext()){
tmpMsg = msgIterator.next();
tmpVertexState = getVertexValue().getState();
- if(tmpVertexState == State.TODELETE)
+ tmpSourceVertextId = getVertexId().getBytes();
+ if(tmpVertexState == State.TODELETE){
+ GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "already merged by head");
deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
- setVertexId(GraphVertexOperation.mergeTwoChainVertex(getVertexId().getBytes(),
- tmpMsg.getChainVertexId(), tmpMsg.getLengthOfChain()));
- byte[] tmpByte = tmpMsg.getSourceVertexIdOrNeighberInfo();
- tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte[0]);
+ }
+ if(getSuperstep() == 5){
+ lengthOfMergeChainVertex = GraphVertexOperation.k;
+ mergeChainVertexId = getVertexId().getBytes();
+ }
+ mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
+ tmpMsg.getSourceVertexId());
+ lengthOfMergeChainVertex++;
+ //test
+ GraphVertexOperation.testMergeChainVertex(writer, getSuperstep(),
+ mergeChainVertexId, lengthOfMergeChainVertex);
+ byte tmpByte = tmpMsg.getNeighberInfo();
+ tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
tmpVal = getVertexValue();
tmpVal.setValue(tmpVertexValue);
setVertexValue(tmpVal);
@@ -158,6 +239,9 @@
voteToHalt();
}
}
+ try {
+ writer.close();
+ } catch (IOException e) { e.printStackTrace(); }
}
private void signalTerminate() {
@@ -195,8 +279,8 @@
/**
* BinaryInput and BinaryOutput~/
*/
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(ByteWritable.class);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
index 558b4fb..3e02d2f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
@@ -62,7 +62,7 @@
private MessageWritable tmpMsg = new MessageWritable();
OutputStreamWriter writer;
/**
- * Naive Algorithm for merge graph
+ * Naive Algorithm for path merge graph
*/
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
@@ -97,7 +97,6 @@
if(tmpChainVertexId.length == 0){
tmpMsg.setLengthOfChain(GraphVertexOperation.k);
tmpMsg.setChainVertexId(tmpVertextId);
- sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
}
else{
tmpMsg.incrementLength();
@@ -105,9 +104,9 @@
tmpChainVertexId,
tmpMsg.getLengthOfChain()-1,
tmpVertextId));
- sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
deleteVertex(getVertexId());
}
+ sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
//test
GraphVertexOperation.testMessageCommunication(writer,getSuperstep(),tmpVertextId,
tmpSourceVertextId,tmpMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
index e2d33c1..262ce7f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
@@ -16,30 +16,47 @@
* chainVertexId stores the chains of connected DNA
* file stores the point to the file that stores the chains of connected DNA
*/
- private byte[] sourceVertexIdOrNeighberInfo;
+ private byte[] sourceVertexId;
+ private byte neighberInfo;
private int lengthOfChain;
private byte[] chainVertexId;
private File file;
private int message;
private int sourceVertexState;
- public LogAlgorithmMessageWritable(){
+ public LogAlgorithmMessageWritable(){
+ sourceVertexId = new byte[(GraphVertexOperation.k-1)/4 + 1];
}
- public void set(byte[] sourceVertexIdOrNeighberInfo, byte[] chainVertexId, File file){
- this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+ public void set(byte[] sourceVertexId,byte neighberInfo, byte[] chainVertexId, File file){
+ this.sourceVertexId = sourceVertexId;
this.chainVertexId = chainVertexId;
this.file = file;
this.message = 0;
this.lengthOfChain = 0;
}
-
- public byte[] getSourceVertexIdOrNeighberInfo() {
- return sourceVertexIdOrNeighberInfo;
+
+ public void reset(){
+ sourceVertexId = null;
+ neighberInfo = (Byte) null;
+ message = 0;
+ sourceVertexState = 0;
}
- public void setSourceVertexIdOrNeighberInfo(byte[] sourceVertexIdOrNeighberInfo) {
- this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+ public byte[] getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(byte[] sourceVertexId) {
+ this.sourceVertexId = sourceVertexId;
+ }
+
+ public byte getNeighberInfo() {
+ return neighberInfo;
+ }
+
+ public void setNeighberInfo(byte neighberInfo) {
+ this.neighberInfo = neighberInfo;
}
public byte[] getChainVertexId() {
@@ -92,7 +109,8 @@
out.writeInt(lengthOfChain);
if(lengthOfChain != 0)
out.write(chainVertexId);
- out.write(sourceVertexIdOrNeighberInfo);
+ out.write(sourceVertexId);
+ out.write(neighberInfo);
out.writeInt(message);
out.writeInt(sourceVertexState);
}
@@ -107,11 +125,9 @@
}
else
chainVertexId = new byte[0];
- if(lengthOfChain % 2 == 0)
- sourceVertexIdOrNeighberInfo = new byte[(GraphVertexOperation.k-1)/4 + 1];
- else
- sourceVertexIdOrNeighberInfo = new byte[1];
- in.readFully(sourceVertexIdOrNeighberInfo);
+ sourceVertexId = new byte[(GraphVertexOperation.k-1)/4 + 1];
+ in.readFully(sourceVertexId);
+ neighberInfo = in.readByte();
message = in.readInt();
sourceVertexState = in.readInt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
index 7861247..151caae 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
@@ -57,20 +57,6 @@
}
public void setChainVertexId(byte[] chainVertexId) {
- /*if(lengthOfChain == 0){
- this.chainVertexId = chainVertexId;
- return;
- }
- int numOfByte = (2*lengthOfChain-1)/8 + 1;
- if(chainVertexId.length == numOfByte)
- this.chainVertexId = chainVertexId;
- else{
- byte[] tmp = new byte[numOfByte];
- for(int i = 0; i < numOfByte; i++)
- tmp[i] = chainVertexId[i];
- this.chainVertexId = tmp;
- }*/
-
this.chainVertexId = chainVertexId;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
index 85f99c2..5280fef 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
@@ -3,7 +3,7 @@
public class Message {
public static final int NON = 0;
- public static final int START = 0;
- public static final int END = 0;
+ public static final int START = 1;
+ public static final int END = 2;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
index 52e9f38..c0531ff 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -13,9 +13,14 @@
import edu.uci.ics.pregelix.BinaryLoadGraphInputFormat;
import edu.uci.ics.pregelix.BinaryLoadGraphOutputFormat;
import edu.uci.ics.pregelix.LoadGraphVertex;
+import edu.uci.ics.pregelix.LogAlgorithmForMergeGraphInputFormat;
+import edu.uci.ics.pregelix.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.pregelix.LogAlgorithmForMergeGraphVertex;
import edu.uci.ics.pregelix.MergeGraphVertex;
import edu.uci.ics.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
+import edu.uci.ics.pregelix.TestLoadGraphVertex;
import edu.uci.ics.pregelix.TextLoadGraphInputFormat;
+import edu.uci.ics.pregelix.testDeleteVertexId;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -56,13 +61,30 @@
generateBinaryLoadGraphJob("BinaryLoadGraph", outputBase + "BinaryLoadGraph.xml");
}
+ private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
+ job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(ByteWritable.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genLogAlgorithmForMergeGraph() throws IOException {
+ generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+ }
+
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
- genBinaryLoadGraph();
+ genLogAlgorithmForMergeGraph();
+ //genBinaryLoadGraph();
//genSequenceLoadGraph();
//genBasicBinaryLoadGraph();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
index 1143c95..e3fa41c 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
@@ -50,7 +50,7 @@
job.setJobName(jobName);
this.resultFileName = resultFile;
this.expectedFileName = expectedFile;
- giraphJobGens = new JobGen[4];
+ giraphJobGens = new JobGen[1];
giraphJobGens[0] = new JobGenOuterJoin(job);
/*waitawhile();
giraphJobGens[1] = new JobGenInnerJoin(job);