Update LogAlgorithm for merging graph
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3047 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/data/webmap/sequenceFileLongMergeTest b/genomix/genomix-pregelix/data/webmap/sequenceFileLongMergeTest
new file mode 100755
index 0000000..ff27c48
--- /dev/null
+++ b/genomix/genomix-pregelix/data/webmap/sequenceFileLongMergeTest
Binary files differ
diff --git a/genomix/genomix-pregelix/data/webmap/sequenceFileMergeTest b/genomix/genomix-pregelix/data/webmap/sequenceFileMergeTest
new file mode 100755
index 0000000..b50d3f1
--- /dev/null
+++ b/genomix/genomix-pregelix/data/webmap/sequenceFileMergeTest
Binary files differ
diff --git a/genomix/genomix-pregelix/pom.xml b/genomix/genomix-pregelix/pom.xml
index 28f5e1c..1d713a5 100644
--- a/genomix/genomix-pregelix/pom.xml
+++ b/genomix/genomix-pregelix/pom.xml
@@ -3,8 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.pregelix</groupId>
<artifactId>genomix-pregelix</artifactId>
- <version>0.2.2</version>
- <packaging>jar</packaging>
+ <version>0.2.3-SNAPSHOT</version>
<name>genomix-pregelix</name>
<properties>
@@ -92,25 +91,25 @@
<dependencies>
<dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix-core</artifactId>
- <version>0.2.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix-example</artifactId>
- <version>0.2.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix-core</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix-example</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<scm>
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
index bcf7214..eb78050 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
@@ -14,6 +14,7 @@
import edu.uci.ics.pregelix.api.io.binary.BinaryVertexInputFormat;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.example.io.MessageWritable;
+import edu.uci.ics.pregelix.type.KmerCountValue;
public class BinaryLoadGraphInputFormat extends
BinaryVertexInputFormat<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
@@ -35,7 +36,7 @@
private BytesWritable vertexId = new BytesWritable();
private ByteWritable vertexValue = new ByteWritable();
- public BinaryLoadGraphReader(RecordReader<BytesWritable,ByteWritable> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
super(recordReader);
}
@@ -59,13 +60,14 @@
/**
* set the src vertex id
*/
- vertexId.set(getRecordReader().getCurrentKey());
+ vertexId = getRecordReader().getCurrentKey();
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- vertexValue.set(getRecordReader().getCurrentValue().get());
+ KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
+ vertexValue.set(kmerCountValue.getAdjBitMap());
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
index 6e832a6..076b1be 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -215,7 +215,7 @@
*/
public static byte[] getLastKmer(byte[] chainVertexId, int lengthOfChainVertex){
String originalVertexId = BitwiseOperation.convertBytesToBinaryString(chainVertexId);
- return BitwiseOperation.convertBinaryStringToBytes(originalVertexId.substring(lengthOfChainVertex-1-k+1,lengthOfChainVertex-1));
+ return BitwiseOperation.convertBinaryStringToBytes(originalVertexId.substring(2*(lengthOfChainVertex-k),2*lengthOfChainVertex));
}
/**
* read vertexId from RecordReader
@@ -224,4 +224,95 @@
String finalBinaryString = BitwiseOperation.convertBytesToBinaryStringKmer(currentKey.getBytes(),k);
return new BytesWritable(BitwiseOperation.convertBinaryStringToBytes(finalBinaryString));
}
+ /**
+ * merge two BytesWritable. Ex. merge two vertexId
+ */
+ public static BytesWritable mergeTwoChainVertex(byte[] b1, byte[] b2, int length){
+ return new BytesWritable(BitwiseOperation.mergeTwoBytesArray(b1, length, b2, length));
+ }
+ /**
+ * update right neighber
+ */
+ public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue){
+ return BitwiseOperation.replaceLastFourBits(oldVertexValue, newVertexValue);
+ }
+ /**
+ * update right neighber based on next vertexId
+ */
+ public static byte updateRightNeighberByVertexId(byte oldVertexValue, byte[] neighberVertexId){
+ String oldVertex = BitwiseOperation.convertByteToBinaryString(oldVertexValue);
+ String neighber = BitwiseOperation.convertBytesToBinaryStringKmer(neighberVertexId, k);
+ String lastTwoBits = neighber.substring(2*k-2,2*k);
+ if(lastTwoBits.compareTo("00") == 0)
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "0001");
+ else if(lastTwoBits.compareTo("01") == 0)
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "0010");
+ else if(lastTwoBits.compareTo("10") == 0)
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "0100");
+ else if(lastTwoBits.compareTo("11") == 0)
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "1000");
+
+ return (Byte) null;
+ }
+ /**
+ * get precursor in vertexValue from gene code
+ */
+ public static byte getPrecursorFromGeneCode(byte vertexValue, char precursor){
+ String oldVertex = BitwiseOperation.convertByteToBinaryString(vertexValue);
+ switch(precursor){
+ case 'A':
+ return BitwiseOperation.convertBinaryStringToByte("0001" + oldVertex.substring(0,4));
+ case 'C':
+ return BitwiseOperation.convertBinaryStringToByte("0010" + oldVertex.substring(0,4));
+ case 'G':
+ return BitwiseOperation.convertBinaryStringToByte("0100" + oldVertex.substring(0,4));
+ case 'T':
+ return BitwiseOperation.convertBinaryStringToByte("1000" + oldVertex.substring(0,4));
+ default:
+ return (Byte) null;
+ }
+ }
+ /**
+ * get succeed in vertexValue from gene code
+ */
+ public static byte getSucceedFromGeneCode(byte vertexValue, char succeed){
+ String oldVertex = BitwiseOperation.convertByteToBinaryString(vertexValue);
+ switch(succeed){
+ case 'A':
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "0001");
+ case 'C':
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "0010");
+ case 'G':
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "0100");
+ case 'T':
+ return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "1000");
+ default:
+ return (Byte) null;
+ }
+ }
+ /**
+ * convert gene code to binary string
+ */
+ public static String convertGeneCodeToBinaryString(String gene){
+ String result = "";
+ for(int i = 0; i < gene.length(); i++){
+ switch(gene.charAt(i)){
+ case 'A':
+ result += "00";
+ break;
+ case 'C':
+ result += "01";
+ break;
+ case 'G':
+ result += "10";
+ break;
+ case 'T':
+ result += "11";
+ break;
+ default:
+ break;
+ }
+ }
+ return result;
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
index 9f757c5..9892376 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
@@ -104,17 +104,18 @@
job.setVertexClass(LoadGraphVertex.class);
/**
* TextInput and TextOutput
- * job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
- * job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
- */
+ */
+ job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+
/**
* BinaryInput and BinaryOutput
*/
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
+ /* job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
+ job.setOutputValueClass(ByteWritable.class);*/
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
new file mode 100644
index 0000000..4061d4f
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -0,0 +1,205 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.pregelix.example.io.ValueStateWritable;
+import edu.uci.ics.pregelix.type.Message;
+import edu.uci.ics.pregelix.type.State;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class LogAlgorithmForMergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+
+ private byte[] tmpSourceVertextId;
+ private byte[] tmpDestVertexId;
+ private byte[] tmpChainVertexId;
+ private byte tmpVertexValue;
+ private int tmpVertexState;
+ private int tmpMessage;
+ private ValueStateWritable tmpVal = new ValueStateWritable();
+ private LogAlgorithmMessageWritable tmpMsg = new LogAlgorithmMessageWritable();
+ public static final int k = 3; //kmer, k = 3
+ /**
+ * For test, in compute method, make each vertexValue shift 1 to left.
+ * It will be modified when going forward to next step.
+ */
+ @Override
+ public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ tmpVal = getVertexValue();
+ tmpVertexValue = tmpVal.getValue();
+ if(GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))){
+ tmpMsg.setMessage(Message.START);
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), tmpVertexValue);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ voteToHalt();
+ }
+ else if(GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
+ tmpMsg.setMessage(Message.END);
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), tmpVertexValue);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ voteToHalt();
+ }
+ else if(GraphVertexOperation.isPathVertex(new ByteWritable(tmpVertexValue))){
+ tmpVal.setState(State.MID_VERTEX);
+ setVertexValue(tmpVal);
+ }
+ else
+ voteToHalt();
+ }
+ else if(getSuperstep() == 2){
+ if(msgIterator.hasNext()){
+ tmpMsg = msgIterator.next();
+ tmpMessage = tmpMsg.getMessage();
+ tmpVertexState = getVertexValue().getState();
+ if(tmpMessage == Message.START && tmpVertexState == State.MID_VERTEX){
+ tmpVal.setState(State.START_VERTEX);
+ setVertexValue(tmpVal);
+ }
+ else if(tmpMessage == Message.END && tmpVertexState == State.MID_VERTEX){
+ tmpVal.setState(State.END_VERTEX);
+ setVertexValue(tmpVal);
+ }
+ }
+ }
+ //head node sends message to path node
+ else if(getSuperstep()%3 == 0){
+ tmpVertexState = getVertexValue().getState();
+ tmpSourceVertextId = getVertexId().getBytes();
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
+ getVertexValue().getValue());
+ if(tmpVertexState == State.START_VERTEX){
+ tmpMsg.setMessage(Message.START);
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ }
+ else if(tmpVertexState == State.END_VERTEX){
+ tmpMsg.setMessage(Message.NON);
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ }
+ }
+ //path node sends message back to head node
+ else if(getSuperstep()%3 == 1){
+ if(msgIterator.hasNext()){
+ tmpVal = getVertexValue();
+ tmpMsg = msgIterator.next();
+ tmpMessage = tmpMsg.getMessage();
+ tmpSourceVertextId = getVertexId().getBytes();
+ tmpMsg.setChainVertexId(tmpSourceVertextId);
+ byte[] tmpBytes = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
+ tmpVal.getValue());
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpBytes); //set neighber
+ tmpMsg.setSourceVertexState(tmpVal.getState());
+ sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+ //kill Message because it has been merged by the head
+ if(tmpMessage == Message.START){
+ tmpVal.setState(State.TODELETE);
+ setVertexValue(tmpVal);
+ }
+ }
+ else
+ deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf because it doesn't receive any message
+ }
+ else if(getSuperstep()%3 == 2){
+ if(msgIterator.hasNext()){
+ tmpMsg = msgIterator.next();
+ tmpVertexState = getVertexValue().getState();
+ if(tmpVertexState == State.TODELETE)
+ deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
+ setVertexId(GraphVertexOperation.mergeTwoChainVertex(getVertexId().getBytes(),
+ tmpMsg.getChainVertexId(), tmpMsg.getLengthOfChain()));
+ byte[] tmpByte = tmpMsg.getSourceVertexIdOrNeighberInfo();
+ tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte[0]);
+ tmpVal = getVertexValue();
+ tmpVal.setValue(tmpVertexValue);
+ setVertexValue(tmpVal);
+ if(tmpVertexState == State.END_VERTEX)
+ voteToHalt();
+ }
+ }
+ }
+
+ private void signalTerminate() {
+ Configuration conf = getContext().getConfiguration();
+ try {
+ IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+ writeMergeGraphResult(conf, true);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void writeMergeGraphResult(Configuration conf, boolean terminate) {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "MergeGraph";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(terminate);
+ output.flush();
+ output.close();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
+ job.setVertexClass(MergeGraphVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
+ job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(ByteWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
index 2325ac3..1261a0e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
@@ -52,18 +52,20 @@
private byte[] tmpSourceVertextId;
private byte[] tmpDestVertexId;
private byte[] tmpChainVertexId;
+ private byte[] tmpNeighberBytes = new byte[1];
+ private byte tmpVertexValue;
private MessageWritable tmpMsg = new MessageWritable();
public static final int k = 3; //kmer, k = 3
/**
- * For test, in compute method, make each vertexValue shift 1 to left.
- * It will be modified when going forward to next step.
+ * Naive Algorithm for merge graph
*/
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 1) {
if(GraphVertexOperation.isHead(getVertexValue())){
tmpSourceVertextId = getVertexId().getBytes();
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, getVertexValue().get());
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
+ getVertexValue().get());
tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
tmpChainVertexId = new byte[0];
tmpMsg.setChainVertexId(tmpChainVertexId);
@@ -73,51 +75,67 @@
//path node sends message back to head node
else if(getSuperstep()%2 == 0){
if(msgIterator.hasNext()){
- if(GraphVertexOperation.isPathVertex(getVertexValue())){
- tmpMsg = msgIterator.next();
- tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
- byte[] tmpBytes = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), getVertexValue().get());
- tmpMsg.setSourceVertexIdOrNeighberInfo(tmpBytes); //set neighber
- tmpChainVertexId = tmpMsg.getChainVertexId();
- if(tmpChainVertexId.length == 0){
- tmpMsg.setChainVertexId(getVertexId().getBytes());
- tmpMsg.setLengthOfChain(k);
- }
- else{
- tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(tmpChainVertexId,tmpMsg.getLengthOfChain(),getVertexId().getBytes()));
- tmpMsg.incrementLength();
- }
- sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
- }
- else if(GraphVertexOperation.isRear(getVertexValue())){
- tmpMsg = msgIterator.next();
- tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
- tmpMsg.setSourceVertexIdOrNeighberInfo(getVertexId().getBytes());
- tmpMsg.setRear(true);
- sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
- }
- else voteToHalt();
- }
- }
- //head node sends message to path node
- else if(getSuperstep()%2 == 1){
- while (msgIterator.hasNext()){
tmpMsg = msgIterator.next();
if(!tmpMsg.isRear()){
- tmpSourceVertextId = getVertexId().getBytes();
- tmpDestVertexId = tmpMsg.getSourceVertexIdOrNeighberInfo();
- tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ if(GraphVertexOperation.isPathVertex(getVertexValue())){
+ tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+ //GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), getVertexValue().get());
+ tmpNeighberBytes[0] = getVertexValue().get();
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpNeighberBytes); //set neighber
+ tmpChainVertexId = tmpMsg.getChainVertexId();
+ if(tmpChainVertexId.length == 0){
+ tmpMsg.setChainVertexId(getVertexId().getBytes());
+ tmpMsg.setLengthOfChain(k);
+ }
+ else{
+ tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(tmpChainVertexId,
+ tmpMsg.getLengthOfChain(),getVertexId().getBytes()));
+ tmpMsg.incrementLength();
+ //deleteVertex(getVertexId());
+ }
+ sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+
+ }
+ else if(GraphVertexOperation.isRear(getVertexValue())){
+ tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+ tmpMsg.setSourceVertexIdOrNeighberInfo(getVertexId().getBytes());
+ tmpMsg.setRear(true);
+ sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+ }
+ voteToHalt();
}
else{
-
- try {
- HDFSOperation hdfsOperation = new HDFSOperation();
- HDFSOperation.insertHDFSFile("testHDFS/chainVertex", tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId());
- } catch (IOException e) { e.printStackTrace(); }
+ tmpVertexValue = GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().get(),
+ tmpMsg.getSourceVertexIdOrNeighberInfo());
+ setVertexValue(new ByteWritable(tmpVertexValue));
+ setVertexId(new BytesWritable(tmpMsg.getChainVertexId()));
signalTerminate();
}
}
+ else voteToHalt();
+ }
+ //head node sends message to path node
+ else if(getSuperstep()%2 == 1){
+ if (msgIterator.hasNext()){
+ tmpMsg = msgIterator.next();
+ tmpNeighberBytes = tmpMsg.getSourceVertexIdOrNeighberInfo();
+ tmpChainVertexId = tmpMsg.getChainVertexId();
+ if(!tmpMsg.isRear()){
+ byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpChainVertexId,
+ tmpMsg.getLengthOfChain());
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, tmpNeighberBytes[0]);
+ tmpSourceVertextId = getVertexId().getBytes();
+ tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ }
+ else{
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(),
+ getVertexValue().get());
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ voteToHalt();
+ }
+ }
+ voteToHalt();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
index e52f979..c7a4556 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
@@ -11,13 +11,15 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import edu.uci.ics.pregelix.GraphVertexOperation;
import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
+import edu.uci.ics.pregelix.type.KmerCountValue;
public class GenerateSequenceFile {
static private final Path TMP_DIR = new Path(
GenerateSequenceFile.class.getSimpleName() + "_TMP");
- private static Path outDir = new Path(TMP_DIR, "out");
+ private static Path outDir = new Path("data/webmap");
private final static int k = 3;
/**
@@ -108,10 +110,10 @@
public static void createMergeTest() throws IOException{
//write output to a file
Configuration conf = new Configuration();
- Path outFile = new Path(outDir, "mergeTest");
+ Path outFile = new Path(outDir, "sequenceFileMergeTest");
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
- outFile, BytesWritable.class, ByteWritable.class,
+ outFile, BytesWritable.class, KmerCountValue.class,
CompressionType.NONE);
@@ -209,22 +211,26 @@
arrayOfKeys.add(keyWritable);
arrayOfValues.add(valueWritable);
+ KmerCountValue kmerCountValue = null;
//wirte to sequence file
- for(int i = 0; i < arrayOfKeys.size(); i++)
- writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
+ for(int i = 0; i < arrayOfKeys.size(); i++){
+ kmerCountValue = new KmerCountValue();
+ kmerCountValue.setAdjBitMap(arrayOfValues.get(i).get());
+ writer.append(arrayOfKeys.get(i), kmerCountValue);
+ }
writer.close();
//read outputs
- Path inFile = new Path(outDir, "mergeTest");
+ Path inFile = new Path(outDir, "sequenceFileMergeTest");
BytesWritable outKey = new BytesWritable();
- ByteWritable outValue = new ByteWritable();
+ KmerCountValue outValue = new KmerCountValue();
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
int iteration = 1;
try {
while(reader.next(outKey, outValue)){
System.out.println(iteration);
System.out.println("key: " + BitwiseOperation.convertBytesToBinaryStringKmer(outKey.getBytes(),k));
- System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(outValue.get()));
+ System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(outValue.getAdjBitMap()));
System.out.println();
iteration++;
}
@@ -233,12 +239,258 @@
}
}
+ /**
+ * create a mergeTest SequenceFile
+ * CAG - AGC - GCG - CGT - GTA - TAT - ATA
+ * GAG ATC
+ *
+ */
+ public static void createLongMergeTest() throws IOException{
+ //write output to a file
+ Configuration conf = new Configuration();
+ Path outFile = new Path(outDir, "sequenceFileMergeTest");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, BytesWritable.class, KmerCountValue.class,
+ CompressionType.NONE);
+
+
+ //Generate <key,value> <BytesWritable, ByteWritable>
+ // CAG
+ String tmpKey = "010010";
+ byte[] key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ String tmpValue = "00000010";
+ byte value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ BytesWritable keyWritable = new BytesWritable(key);
+ ByteWritable valueWritable = new ByteWritable(value);
+
+ ArrayList<BytesWritable> arrayOfKeys = new ArrayList<BytesWritable>();
+ arrayOfKeys.add(keyWritable);
+ ArrayList<ByteWritable> arrayOfValues = new ArrayList<ByteWritable>();
+ arrayOfValues.add(valueWritable);
+
+ // AGC
+ tmpKey = "001001";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "01100001";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // GAG
+ tmpKey = "100010";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "00000010";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // TAT
+ tmpKey = "110011";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "00100011";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // ATA
+ tmpKey = "001100";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "10000000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // ATC
+ tmpKey = "001101";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "10000000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ KmerCountValue kmerCountValue = null;
+ //wirte to sequence file
+ for(int i = 0; i < arrayOfKeys.size(); i++){
+ kmerCountValue = new KmerCountValue();
+ kmerCountValue.setAdjBitMap(arrayOfValues.get(i).get());
+ writer.append(arrayOfKeys.get(i), kmerCountValue);
+ }
+ writer.close();
+
+ //read outputs
+ Path inFile = new Path(outDir, "sequenceFileMergeTest");
+ BytesWritable outKey = new BytesWritable();
+ KmerCountValue outValue = new KmerCountValue();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ int iteration = 1;
+ try {
+ while(reader.next(outKey, outValue)){
+ System.out.println(iteration);
+ System.out.println("key: " + BitwiseOperation.convertBytesToBinaryStringKmer(outKey.getBytes(),k));
+ System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(outValue.getAdjBitMap()));
+ System.out.println();
+ iteration++;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ public static void generateNumOfLinesFromBigFile(Path inFile, Path outFile, int numOfLines) throws IOException{
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, BytesWritable.class, KmerCountValue.class,
+ CompressionType.NONE);
+ BytesWritable outKey = new BytesWritable();
+ KmerCountValue outValue = new KmerCountValue();
+ int i = 0;
+
+ for(i = 0; i < numOfLines; i++){
+ System.out.println(i);
+ reader.next(outKey, outValue);
+ writer.append(outKey, outValue);
+ }
+ writer.close();
+ reader.close();
+ }
+
public static void main(String[] argv) throws Exception {
//createTestDat();
- createMergeTest();
- createTestDat();
+ //createMergeTest();
+ //createTestDat();
+ /* Path dir = new Path("data/webmap");
+ Path inFile = new Path(dir, "part-1");
+ Path outFile = new Path(dir, "part-1-out");
+ generateNumOfLinesFromBigFile(inFile,outFile,10000);*/
+ /**
+ * AGC - A C - TAT
+ * "AGCAAACACGAC T TGCC TAT"
+ * problem "AGCATGGACGTCGATTCTAT"
+ * "AGCACTTAT"
+ * "AGCAAACACTTGCTGTACCGTGGCCTAT"
+ */
+ generateSequenceFileFromGeneCode("AGCATGCGGGTCTAT");//GTCGATT //before T: GGACG
}
-
+ public static void generateSequenceFileFromGeneCode(String s) throws IOException{
+ Configuration conf = new Configuration();
+ Path outFile = new Path(outDir, "sequenceFileMergeTest4");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, BytesWritable.class, KmerCountValue.class,
+ CompressionType.NONE);
+ BytesWritable outKey = null;
+ KmerCountValue outValue;
+ byte adjBitMap;
+ ArrayList<String> lists = new ArrayList<String>();
+ lists.add("010010"); //CAG
+ lists.add("100010"); //GAG
+ lists.add("001001"); //AGC
+ lists.add("110011"); //TAT
+ lists.add("001100"); //ATA
+ lists.add("001101"); //ATC
+ String binaryString = "";
+ for(int i = 1; i < s.length()-k; i++){
+ binaryString = GraphVertexOperation.convertGeneCodeToBinaryString(s.substring(i,i+k));
+ if(lists.contains(binaryString)){
+ System.out.println("error: " + binaryString);
+ return;
+ }
+ lists.add(binaryString);
+ outKey = new BytesWritable(BitwiseOperation.convertBinaryStringToBytes(binaryString));
+ outValue = new KmerCountValue();
+ adjBitMap = GraphVertexOperation.getPrecursorFromGeneCode((byte)0, s.charAt(i-1));
+ adjBitMap = GraphVertexOperation.getSucceedFromGeneCode(adjBitMap, s.charAt(i+k));
+ outValue.setAdjBitMap(adjBitMap);
+ writer.append(outKey, outValue);
+ }
+ /**
+ * CAG - AGC ------ TAT - ATA
+ * GAG ATC
+ */
+ // CAG
+ String tmpKey = "010010";
+ byte[] key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ String tmpValue = "00000010";
+ byte value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ BytesWritable keyWritable = new BytesWritable(key);
+ ByteWritable valueWritable = new ByteWritable(value);
+
+ ArrayList<BytesWritable> arrayOfKeys = new ArrayList<BytesWritable>();
+ arrayOfKeys.add(keyWritable);
+ ArrayList<ByteWritable> arrayOfValues = new ArrayList<ByteWritable>();
+ arrayOfValues.add(valueWritable);
+
+ // AGC
+ tmpKey = "001001";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "01100001";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // GAG
+ tmpKey = "100010";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "00000010";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // TAT
+ tmpKey = "110011";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "00100011";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ // ATA
+ tmpKey = "001100";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "10000000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+ // ATC
+ tmpKey = "001101";
+ key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+ tmpValue = "10000000";
+ value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+ keyWritable = new BytesWritable(key);
+ valueWritable = new ByteWritable(value);
+ arrayOfKeys.add(keyWritable);
+ arrayOfValues.add(valueWritable);
+
+ KmerCountValue kmerCountValue = null;
+ //wirte to sequence file
+ for(int i = 0; i < arrayOfKeys.size(); i++){
+ kmerCountValue = new KmerCountValue();
+ kmerCountValue.setAdjBitMap(arrayOfValues.get(i).get());
+ writer.append(arrayOfKeys.get(i), kmerCountValue);
+ }
+ writer.close();
+ }
public static byte[] hexStringToByteArray(String s) {
int len = s.length();
byte[] data = new byte[len / 2];
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
index 529d429..d238e9d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
@@ -1,102 +1,67 @@
package edu.uci.ics.pregelix;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.util.StringTokenizer;
+import java.util.Iterator;
-import edu.uci.ics.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.io.MessageWritable;
-public class TestLoadGraphVertex {
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class TestLoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
/**
- * If running in different machines, the parameters need to be changed.
- * Now, this test is not completed.
+ * For test, just output original file
*/
- private static final String EXPECT_RESULT_FILE = "~/workspace/genomix-pregelix/expect/expected_result";
- private static final String INPUT_PATHS = "~/workspace/genomix-pregelix/folder";
- private static final String OUTPUT_PATH = "~/workspace/genomix-pregelix/tmp/pg_result"; //result
- private static final String IP = "169.234.134.212";
- private static final String PORT = "3099";
- /**
- * @param args
- * @throws Exception
- */
- @SuppressWarnings("deprecation")
- public static void main(String[] args) throws Exception {
- // TODO Auto-generated method stub
- //initiate args
- args = new String[8];
- args[0] = "-inputpaths";
- args[1] = INPUT_PATHS;
- args[2] = "-outputpath";
- args[3] = OUTPUT_PATH;
- args[4] = "-ip";
- args[5] = IP;
- args[6] = "-port";
- args[7] = PORT;
- PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
- job.setVertexClass(LoadGraphVertex.class);
- job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
- Client.run(args, job);
-
- generateExpectBinaryFile();
-
- //test if the actual file is the same as the expected file
- DataInputStream actual_dis = new DataInputStream(new FileInputStream(OUTPUT_PATH + "/*"));
- DataInputStream expected_dis = new DataInputStream(new FileInputStream(EXPECT_RESULT_FILE));
- String actualLine, expectedLine = null;
- StringTokenizer actualSt, expectedSt;
- byte[] actualVertexId, expectedVertexId = null;
- byte actualVertexValue, expectedVertexValue;
- byte[] tmp = null;
- while(((actualLine = actual_dis.readLine()) != null) &&
- ((expectedLine = expected_dis.readLine()) != null)){
- actualSt = new StringTokenizer(actualLine, " ");
- actualVertexId = actualSt.nextToken().getBytes();
- tmp = actualSt.nextToken().getBytes();
- actualVertexValue = tmp[0];
-
- expectedSt = new StringTokenizer(expectedLine," ");
- expectedVertexId = expectedSt.nextToken().getBytes();
- tmp = expectedSt.nextToken().getBytes();
- expectedVertexValue = tmp[0];
-
- //assertEquals("actualVextexId == expectedVertexId", actualVertexId, expectedVertexId);
- //assertEquals("actualVertexValue == expectedVertexValue", actualVertexValue, expectedVertexValue);
- }
-
- //assertEquals("actualLine should be the end and be equal to null", actualLine, null);
- //assertEquals("expectedLine should be the end and be equal to null", expectedLine, null);
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ voteToHalt();
}
- @SuppressWarnings("deprecation")
- public static void generateExpectBinaryFile() throws Exception{
- DataInputStream dis = new DataInputStream(new FileInputStream(INPUT_PATHS + "/*"));
- DataOutputStream dos = new DataOutputStream(new FileOutputStream(EXPECT_RESULT_FILE));
- String line;
- byte[] vertexId = null;
- byte vertexValue;
- byte[] tmp = null;
- while((line = dis.readLine()) != null){
- StringTokenizer st = new StringTokenizer(line, " ");
- vertexId = st.nextToken().getBytes();
- tmp = st.nextToken().getBytes();
- vertexValue = tmp[0];
-
- vertexValue = (byte) (vertexValue << 1);
- for(int i = 0; i < vertexId.length; i++)
- dos.writeByte(vertexId[i]);
- dos.writeByte((byte)32); //space
- dos.writeByte(vertexValue);
- dos.writeByte((byte)10); //line feed
- }
-
- dis.close();
- dos.close();
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
+ job.setVertexClass(TestLoadGraphVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
+ job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(ByteWritable.class);
+ Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java
index d3354bd..c5db334 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -15,6 +15,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.type.KmerCountValue;
public class BinaryVertexInputFormat <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M>{
@@ -37,7 +38,7 @@
public static abstract class BinaryVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<BytesWritable,ByteWritable> lineRecordReader;
+ private final RecordReader<BytesWritable,KmerCountValue> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -47,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryVertexReader(RecordReader<BytesWritable, ByteWritable> recordReader) {
+ public BinaryVertexReader(RecordReader<BytesWritable, KmerCountValue> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -73,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<BytesWritable,ByteWritable> getRecordReader() {
+ protected RecordReader<BytesWritable,KmerCountValue> getRecordReader() {
return lineRecordReader;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java
index 3e4db6e..b064707 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java
@@ -100,4 +100,16 @@
else
return sb.substring(8*num - 2, 8*num);
}
+
+ public static byte[] mergeTwoBytesArray(byte[] b1, int size1, byte[] b2, int size2){
+ String s1 = convertBytesToBinaryStringKmer(b1,size1);
+ String s2 = convertBytesToBinaryStringKmer(b2,size2);
+ return convertBinaryStringToBytes(s1 + s2);
+ }
+
+ public static byte replaceLastFourBits(byte b1, byte b2){
+ String s1 = convertByteToBinaryString(b1);
+ String s2 = convertByteToBinaryString(b2);
+ return convertBinaryStringToByte(s1.substring(0,4) + s2.substring(4,8));
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
new file mode 100644
index 0000000..b7ac6ee
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
@@ -0,0 +1,154 @@
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable>{
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * chainVertexId stores the chains of connected DNA
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private byte[] sourceVertexIdOrNeighberInfo;
+ private int lengthOfChain;
+ private byte[] chainVertexId;
+ private File file;
+ private int message;
+ private int sourceVertexState;
+ private static int k = 3;
+
+ public LogAlgorithmMessageWritable(){
+ }
+
+ public void set(byte[] sourceVertexIdOrNeighberInfo, byte[] chainVertexId, File file){
+ this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+ this.chainVertexId = chainVertexId;
+ this.file = file;
+ this.message = 0;
+ this.lengthOfChain = 0;
+ }
+
+ public byte[] getSourceVertexIdOrNeighberInfo() {
+ return sourceVertexIdOrNeighberInfo;
+ }
+
+ public void setSourceVertexIdOrNeighberInfo(byte[] sourceVertexIdOrNeighberInfo) {
+ this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+ }
+
+ public byte[] getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public void setChainVertexId(byte[] chainVertexId) {
+ this.chainVertexId = chainVertexId;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public void setFile(File file) {
+ this.file = file;
+ }
+
+ public int getMessage() {
+ return message;
+ }
+
+ public void setMessage(int message) {
+ this.message = message;
+ }
+
+ public int getSourceVertexState() {
+ return sourceVertexState;
+ }
+
+ public void setSourceVertexState(int sourceVertexState) {
+ this.sourceVertexState = sourceVertexState;
+ }
+
+ public int getLengthOfChain() {
+ return lengthOfChain;
+ }
+
+ public void setLengthOfChain(int lengthOfChain) {
+ this.lengthOfChain = lengthOfChain;
+ }
+
+ public void incrementLength(){
+ this.lengthOfChain++;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODO Auto-generated method stub
+ out.writeInt(lengthOfChain);
+ if(lengthOfChain != 0)
+ out.write(chainVertexId);
+ out.write(sourceVertexIdOrNeighberInfo);
+ out.writeInt(message);
+ out.writeInt(sourceVertexState);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // TODO Auto-generated method stub
+ lengthOfChain = in.readInt();
+ if(lengthOfChain != 0){
+ chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
+ in.readFully(chainVertexId);
+ }
+ else
+ chainVertexId = new byte[0];
+ if(lengthOfChain % 2 == 0)
+ sourceVertexIdOrNeighberInfo = new byte[(k-1)/4 + 1];
+ else
+ sourceVertexIdOrNeighberInfo = new byte[1];
+ in.readFully(sourceVertexIdOrNeighberInfo);
+ message = in.readInt();
+ sourceVertexState = in.readInt();
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 0;
+ for(int i = 0; i < chainVertexId.length; i++)
+ hashCode = (int)chainVertexId[i];
+ return hashCode;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof LogAlgorithmMessageWritable) {
+ LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
+ return chainVertexId == tp.chainVertexId && file == tp.file;
+ }
+ return false;
+ }
+ @Override
+ public String toString() {
+ return chainVertexId.toString() + "\t" + file.getAbsolutePath();
+ }
+ @Override
+ public int compareTo(LogAlgorithmMessageWritable tp) {
+ // TODO Auto-generated method stub
+ int cmp;
+ if (chainVertexId == tp.chainVertexId)
+ cmp = 0;
+ else
+ cmp = 1;
+ if (cmp != 0)
+ return cmp;
+ if (file == tp.file)
+ return 0;
+ else
+ return 1;
+ }
+
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
new file mode 100644
index 0000000..b52c9a5
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
@@ -0,0 +1,58 @@
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.*;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.type.State;
+
+
+public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
+
+ private byte value;
+ private int state;
+
+ public ValueStateWritable() {
+ state = State.NON_VERTEX;
+ }
+
+ public ValueStateWritable(byte value, int state) {
+ this.value = value;
+ this.state = state;
+ }
+
+ public byte getValue() {
+ return value;
+ }
+
+ public void setValue(byte value) {
+ this.value = value;
+ }
+
+ public int getState() {
+ return state;
+ }
+
+ public void setState(int state) {
+ this.state = state;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ value = in.readByte();
+ state = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(value);
+ out.writeInt(state);
+ }
+
+ @Override
+ public int compareTo(ValueStateWritable o) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Kmer.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Kmer.java
new file mode 100644
index 0000000..f35da10
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Kmer.java
@@ -0,0 +1,208 @@
+package edu.uci.ics.pregelix.type;
+
+public class Kmer {
+
+ public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
+
+ public final static class GENE_CODE {
+
+ /**
+ * make sure this 4 ids equal to the sequence id of char in
+ * {@GENE_SYMBOL}
+ */
+ public static final byte A = 0;
+ public static final byte C = 1;
+ public static final byte G = 2;
+ public static final byte T = 3;
+
+ public static byte getCodeFromSymbol(byte ch) {
+ byte r = 0;
+ switch (ch) {
+ case 'A':
+ case 'a':
+ r = A;
+ break;
+ case 'C':
+ case 'c':
+ r = C;
+ break;
+ case 'G':
+ case 'g':
+ r = G;
+ break;
+ case 'T':
+ case 't':
+ r = T;
+ break;
+ }
+ return r;
+ }
+
+ public static byte getSymbolFromCode(byte code) {
+ if (code > 3) {
+ return '!';
+ }
+ return GENE_SYMBOL[code];
+ }
+
+ public static byte getAdjBit(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 1 << A;
+ break;
+ case 'C':
+ case 'c':
+ r = 1 << C;
+ break;
+ case 'G':
+ case 'g':
+ r = 1 << G;
+ break;
+ case 'T':
+ case 't':
+ r = 1 << T;
+ break;
+ }
+ return r;
+ }
+
+ public static byte mergePreNextAdj(byte pre, byte next) {
+ return (byte) (pre << 4 | (next & 0x0f));
+ }
+
+ public static String getSymbolFromBitMap(byte code) {
+ int left = (code >> 4) & 0x0F;
+ int right = code & 0x0F;
+ StringBuilder str = new StringBuilder();
+ for (int i = A; i <= T; i++) {
+ if ((left & (1 << i)) != 0) {
+ str.append((char)GENE_SYMBOL[i]);
+ }
+ }
+ str.append('|');
+ for (int i = A; i <= T; i++) {
+ if ((right & (1 << i)) != 0) {
+ str.append((char)GENE_SYMBOL[i]);
+ }
+ }
+ return str.toString();
+ }
+ }
+
+ public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
+ int keyLength) {
+ StringBuilder strKmer = new StringBuilder();
+ int byteId = keyStart + keyLength - 1;
+ byte currentbyte = keyData[byteId];
+ for (int geneCount = 0; geneCount < k; geneCount++) {
+ if (geneCount % 4 == 0 && geneCount > 0) {
+ currentbyte = keyData[--byteId];
+ }
+ strKmer.append((char) GENE_SYMBOL[(currentbyte >> ((geneCount % 4) * 2)) & 0x03]);
+ }
+ return strKmer.toString();
+ }
+
+ public static int getByteNumFromK(int k){
+ int x = k/4;
+ if (k%4 !=0){
+ x+=1;
+ }
+ return x;
+ }
+
+ /**
+ * Compress Kmer into bytes array AATAG will compress as [0 0 0 G][A T A A]
+ *
+ * @param kmer
+ * @param input
+ * array
+ * @param start
+ * position
+ * @return initialed kmer array
+ */
+ public static byte[] CompressKmer(int k, byte[] array, int start) {
+ final int byteNum = getByteNumFromK(k);
+ byte[] bytes = new byte[byteNum];
+
+ byte l = 0;
+ int bytecount = 0;
+ int bcount = byteNum - 1;
+ for (int i = start; i < start + k; i++) {
+ byte code = GENE_CODE.getCodeFromSymbol(array[i]);
+ l |= (byte) (code << bytecount);
+ bytecount += 2;
+ if (bytecount == 8) {
+ bytes[bcount--] = l;
+ l = 0;
+ bytecount = 0;
+ }
+ }
+ if (bcount >= 0) {
+ bytes[0] = l;
+ }
+ return bytes;
+ }
+
+ /**
+ * Shift Kmer to accept new input
+ *
+ * @param kmer
+ * @param bytes
+ * Kmer Array
+ * @param c
+ * Input new gene character
+ * @return the shiftout gene, in gene code format
+ */
+ public static byte MoveKmer(int k, byte[] kmer, byte c) {
+ int byteNum = kmer.length;
+ byte output = (byte) (kmer[byteNum - 1] & 0x03);
+ for (int i = byteNum - 1; i > 0; i--) {
+ byte in = (byte) (kmer[i - 1] & 0x03);
+ kmer[i] = (byte) (((kmer[i] >>> 2) & 0x3f) | (in << 6));
+ }
+
+ int pos = ((k - 1) % 4) * 2;
+ byte code = (byte) (GENE_CODE.getCodeFromSymbol(c) << pos);
+ kmer[0] = (byte) (((kmer[0] >>> 2) & 0x3f) | code);
+ return (byte) (1 << output);
+ }
+
+ public static void main(String[] argv) {
+ byte[] array = { 'A', 'A', 'T', 'A', 'G', 'A', 'A', 'G' };
+ int k = 5;
+ byte[] kmer = CompressKmer(k, array, 0);
+ for (byte b : kmer) {
+ System.out.print(Integer.toBinaryString(b));
+ System.out.print(' ');
+ }
+ System.out.println();
+ System.out.println(recoverKmerFrom(k, kmer, 0, kmer.length));
+
+ for (int i = k; i < array.length-1; i++) {
+ byte out = MoveKmer(k, kmer, array[i]);
+
+ System.out.println((int) out);
+ for (byte b : kmer) {
+ System.out.print(Integer.toBinaryString(b));
+ System.out.print(' ');
+ }
+ System.out.println();
+ System.out.println(recoverKmerFrom(k, kmer, 0, kmer.length));
+ }
+
+ byte out = MoveKmer(k, kmer, array[array.length - 1]);
+
+ System.out.println((int) out);
+ for (byte b : kmer) {
+ System.out.print(Integer.toBinaryString(b));
+ System.out.print(' ');
+ }
+ System.out.println();
+ System.out.println(recoverKmerFrom(k, kmer, 0, kmer.length));
+
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerBytesWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerBytesWritable.java
new file mode 100644
index 0000000..230b3f7
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerBytesWritable.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.type;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
+ private static final int LENGTH_BYTES = 4;
+ private static final byte[] EMPTY_BYTES = {};
+ private byte size;
+ private byte[] bytes;
+
+ public KmerBytesWritable() {
+ this(EMPTY_BYTES);
+ }
+
+ public KmerBytesWritable(byte[] bytes) {
+ this.bytes = bytes;
+ this.size = (byte) bytes.length;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ @Deprecated
+ public byte[] get() {
+ return getBytes();
+ }
+
+ @Override
+ public int getLength() {
+ return (int) size;
+ }
+
+ @Deprecated
+ public int getSize() {
+ return getLength();
+ }
+
+ public void setSize(byte size) {
+ if ((int) size > getCapacity()) {
+ setCapacity((byte) (size * 3 / 2));
+ }
+ this.size = size;
+ }
+
+ public int getCapacity() {
+ return bytes.length;
+ }
+
+ public void setCapacity(byte new_cap) {
+ if (new_cap != getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (new_cap < size) {
+ size = new_cap;
+ }
+ if (size != 0) {
+ System.arraycopy(bytes, 0, new_data, 0, size);
+ }
+ bytes = new_data;
+ }
+ }
+
+ public void set(KmerBytesWritable newData) {
+ set(newData.bytes, (byte) 0, newData.size);
+ }
+
+ public void set(byte[] newData, byte offset, byte length) {
+ setSize((byte) 0);
+ setSize(length);
+ System.arraycopy(newData, offset, bytes, 0, size);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ setSize((byte) 0); // clear the old data
+ setSize(in.readByte());
+ in.readFully(bytes, 0, size);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(size);
+ out.write(bytes, 0, size);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object right_obj) {
+ if (right_obj instanceof KmerBytesWritable)
+ return super.equals(right_obj);
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer(3 * size);
+ for (int idx = 0; idx < (int) size; idx++) {
+ // if not the first, put a blank separator in
+ if (idx != 0) {
+ sb.append(' ');
+ }
+ String num = Integer.toHexString(0xff & bytes[idx]);
+ // if it is only one digit, add a leading 0.
+ if (num.length() < 2) {
+ sb.append('0');
+ }
+ sb.append(num);
+ }
+ return sb.toString();
+ }
+
+ public static class Comparator extends WritableComparator {
+ public Comparator() {
+ super(KmerBytesWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(KmerBytesWritable.class, new Comparator());
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
new file mode 100644
index 0000000..a89add1
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
@@ -0,0 +1,61 @@
+package edu.uci.ics.pregelix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class KmerCountValue implements Writable{
+ private byte adjBitMap;
+ private byte count;
+
+ public KmerCountValue(byte bitmap, byte count) {
+ reset(bitmap, count);
+ }
+
+ public KmerCountValue() {
+ adjBitMap = 0;
+ count = 0;
+ }
+
+ public byte getAdjBitMap() {
+ return adjBitMap;
+ }
+
+ public void setAdjBitMap(byte adjBitMap) {
+ this.adjBitMap = adjBitMap;
+ }
+
+ public byte getCount() {
+ return count;
+ }
+
+ public void setCount(byte count) {
+ this.count = count;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ adjBitMap = arg0.readByte();
+ count = arg0.readByte();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ arg0.writeByte(adjBitMap);
+ arg0.writeByte(count);
+ }
+
+ @Override
+ public String toString() {
+ return Kmer.GENE_CODE.getSymbolFromBitMap(adjBitMap) + '\t' + String.valueOf(count);
+ }
+
+ public void reset(byte bitmap, byte count) {
+ this.adjBitMap = bitmap;
+ this.count = count;
+ }
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
new file mode 100644
index 0000000..85f99c2
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.pregelix.type;
+
+public class Message {
+
+ public static final int NON = 0;
+ public static final int START = 0;
+ public static final int END = 0;
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
new file mode 100644
index 0000000..61ee2fa
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.pregelix.type;
+
+public class State {
+ public static final int NON_VERTEX = 0;
+ public static final int START_VERTEX = 1;
+ public static final int END_VERTEX = 2;
+ public static final int MID_VERTEX = 3;
+ public static final int TODELETE = 4;
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
index 0dff613..7399a3b 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -5,11 +5,17 @@
import java.io.IOException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import edu.uci.ics.pregelix.BinaryLoadGraphInputFormat;
+import edu.uci.ics.pregelix.BinaryLoadGraphOutputFormat;
import edu.uci.ics.pregelix.LoadGraphVertex;
+import edu.uci.ics.pregelix.MergeGraphVertex;
import edu.uci.ics.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
+import edu.uci.ics.pregelix.TestLoadGraphVertex;
import edu.uci.ics.pregelix.TextLoadGraphInputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -35,13 +41,30 @@
generateLoadGraphJob("LoadGraph", outputBase + "LoadGraph.xml");
}
+ private static void generateBinaryLoadGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MergeGraphVertex.class);
+ job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
+ job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(ByteWritable.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBinaryLoadGraph() throws IOException {
+ generateBinaryLoadGraphJob("BinaryLoadGraph", outputBase + "BinaryLoadGraph.xml");
+ }
+
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
- genLoadGraph();
+ genBinaryLoadGraph();
+ //genBasicBinaryLoadGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
index 156c910..1649e79 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/webmap/test.dat";//test.dat
+ private static final String DATA_PATH = "data/webmap/sequenceFileMergeTest";//sequenceFileMergeTest
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";
diff --git a/genomix/genomix-pregelix/src/test/resources/expected/BinaryLoadGraph.result b/genomix/genomix-pregelix/src/test/resources/expected/BinaryLoadGraph.result
new file mode 100644
index 0000000..2a98362
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/expected/BinaryLoadGraph.result
@@ -0,0 +1,9 @@
+Vertex(id=24,value=100, edges=())
+Vertex(id=30,value=-128, edges=())
+Vertex(id=34,value=-128, edges=())
+Vertex(id=48,value=2, edges=())
+Vertex(id=6c,value=65, edges=())
+Vertex(id=88,value=2, edges=())
+Vertex(id=9b 00,value=24, edges=())
+Vertex(id=b0,value=40, edges=())
+Vertex(id=cc,value=67, edges=())
diff --git a/genomix/genomix-pregelix/src/test/resources/expected/LoadGraph.result b/genomix/genomix-pregelix/src/test/resources/expected/LoadGraph.result
index b96a242..6595d86 100644
--- a/genomix/genomix-pregelix/src/test/resources/expected/LoadGraph.result
+++ b/genomix/genomix-pregelix/src/test/resources/expected/LoadGraph.result
@@ -1,4 +1,4 @@
-06|Vertex(id=06,value=34, edges=())
-07|Vertex(id=07,value=68, edges=())
-1b|Vertex(id=1b,value=-120, edges=())
-2d|Vertex(id=2d,value=-34, edges=())
+Vertex(id=06,value=34, edges=())
+Vertex(id=07,value=68, edges=())
+Vertex(id=1b,value=-120, edges=())
+Vertex(id=2d,value=-34, edges=())