update
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3262 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
index 5b75e05..8197ea8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -305,9 +305,9 @@
/**
* merge two BytesWritable. Ex. merge two vertexId
*/
- public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2){
- String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*length);
- return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), s2.length()/2);
+ public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2, int length2){
+ String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*length2);
+ return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), length2-k+1);
}
/**
* update right neighber
@@ -597,6 +597,21 @@
return;
}
/**
+ * test voteToHalt vertex information
+ */
+ public static void testVoteVertexInfo(OutputStreamWriter writer, long step, byte[] vertexId, String reason){
+ try {
+ writer.write("Step: " + step + "\r\n");
+ writer.write(reason + "\r\n");
+ writer.write("voteToHalt " + BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, GraphVertexOperation.k)
+ + "\t" + GraphVertexOperation.convertBinaryStringToGenecode(
+ BitwiseOperation.convertBytesToBinaryStringKmer(
+ vertexId,GraphVertexOperation.k)) + "\r\n");
+ writer.write("\r\n");
+ } catch (IOException e) { e.printStackTrace(); }
+ return;
+ }
+ /**
* test merge chain vertex
*/
public static void testMergeChainVertex(OutputStreamWriter writer, long step, byte[] mergeChain,
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
index 5a4a47c..c341e0d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -138,28 +138,11 @@
tmpVertexValue = tmpVal.getValue();
tmpVertexState = tmpVal.getState();
tmpSourceVertextId = getVertexId().getBytes();
- if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
- && !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
- // compute the right neighber - destination vertex Id
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- tmpLengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
- tmpMergeChainVertexId = tmpMsg.getChainVertexId();
- byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMergeChainVertexId,
- tmpLengthOfMergeChainVertex);
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer,
- tmpMsg.getNeighberInfo());
- }
- else{
- tmpMsg = new LogAlgorithmMessageWritable();
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
- getVertexValue().getValue());
- }
-
- /*if(tmpMsg.getLengthOfChain()== 0){
- tmpMsg.setLengthOfChain(GraphVertexOperation.k);
- tmpMsg.setChainVertexId(getVertexId().getBytes());
- }*/
+
+ if(getSuperstep() == 3){
+ tmpMsg = new LogAlgorithmMessageWritable();
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
+ getVertexValue().getValue());
if(tmpVertexState == State.START_VERTEX){
tmpMsg.setMessage(Message.START);
tmpMsg.setSourceVertexId(tmpSourceVertextId);
@@ -177,6 +160,35 @@
tmpDestVertexId, tmpMsg, null);
}
}
+ else{
+ if(msgIterator.hasNext()){
+ tmpMsg = msgIterator.next();
+ tmpLengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain();
+ tmpMergeChainVertexId = tmpVal.getMergeChain();
+ byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMergeChainVertexId,
+ tmpLengthOfMergeChainVertex);
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer,
+ tmpMsg.getNeighberInfo());
+ if(tmpVertexState == State.START_VERTEX){
+ tmpMsg.setMessage(Message.START);
+ tmpMsg.setSourceVertexId(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ //test
+ GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+ tmpDestVertexId, tmpMsg, null);
+ }
+ else if(tmpVertexState != State.END_VERTEX){
+ tmpMsg.setMessage(Message.NON);
+ tmpMsg.setSourceVertexId(tmpSourceVertextId);
+ sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+ //test
+ GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+ tmpDestVertexId, tmpMsg, null);
+ }
+ }
+ else
+ voteToHalt();
+ }
}
//path node sends message back to head node
else if(getSuperstep()%3 == 1){
@@ -185,17 +197,13 @@
tmpMsg = msgIterator.next();
tmpMessage = tmpMsg.getMessage();
tmpSourceVertextId = tmpMsg.getSourceVertexId();
- tmpChainVertexId = tmpMsg.getChainVertexId();
if(tmpVal.getLengthOfMergeChain() == 0){
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
tmpVal.setLengthOfMergeChain(GraphVertexOperation.k);
tmpVal.setMergeChain(getVertexId().getBytes());
setVertexValue(tmpVal);
}
- else{
- tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
- tmpMsg.setSourceVertexId(tmpVal.getMergeChain());
- }
+ tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
+ tmpMsg.setChainVertexId(tmpVal.getMergeChain());
tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
tmpMsg.setSourceVertexState(tmpVal.getState());
@@ -215,10 +223,10 @@
}
}
else{
- if(!GraphVertexOperation.isHead(new ByteWritable(getVertexValue().getValue()))
- && !GraphVertexOperation.isRear(new ByteWritable(getVertexValue().getValue()))
- && getVertexValue().getState() != State.START_VERTEX){
-
+ if(getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX
+ && tmpMessage != Message.END && tmpMessage != Message.START){
+
GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "not receive any message");
deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
}
@@ -228,28 +236,34 @@
if(msgIterator.hasNext()){
tmpMsg = msgIterator.next();
tmpVal = getVertexValue();
- if(tmpMsg.getMessage() == Message.END)
- tmpVertexState = State.END_VERTEX;
- else
- tmpVertexState = getVertexValue().getState();
- tmpVal.setState(tmpVertexState);
+ tmpVertexState = tmpVal.getState();
tmpSourceVertextId = getVertexId().getBytes();
if(tmpVertexState == State.TODELETE){
- GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), tmpSourceVertextId, "already merged by head");
+ GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(),
+ tmpSourceVertextId, "already merged by head");
deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
}
else{
+ if(tmpMsg.getMessage() == Message.END){
+ if(tmpVertexState != State.START_VERTEX)
+ tmpVertexState = State.END_VERTEX;
+ else
+ tmpVertexState = State.FINAL_VERTEX;
+ }
+
+ tmpVal.setState(tmpVertexState);
if(getSuperstep() == 5){
lengthOfMergeChainVertex = GraphVertexOperation.k;
mergeChainVertexId = getVertexId().getBytes();
}
else{
- lengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
- mergeChainVertexId = tmpMsg.getChainVertexId();
+ lengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain();
+ mergeChainVertexId = tmpVal.getMergeChain();
}
- mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
- tmpMsg.getSourceVertexId());
- lengthOfMergeChainVertex = 2*lengthOfMergeChainVertex - GraphVertexOperation.k + 1;
+ mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId,
+ lengthOfMergeChainVertex, tmpMsg.getChainVertexId(), tmpMsg.getLengthOfChain()); //tmpMsg.getSourceVertexId()
+ lengthOfMergeChainVertex = lengthOfMergeChainVertex + tmpMsg.getLengthOfChain()
+ - GraphVertexOperation.k + 1;
tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
tmpVal.setMergeChain(mergeChainVertexId);
@@ -259,17 +273,31 @@
byte tmpByte = tmpMsg.getNeighberInfo();
tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
tmpVal.setValue(tmpVertexValue);
- setVertexValue(tmpVal);
- tmpMsg.setNeighberInfo(tmpVertexValue);
- tmpMsg.setLengthOfChain(lengthOfMergeChainVertex);
- tmpMsg.setChainVertexId(mergeChainVertexId);
- sendMsg(getVertexId(),tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
- getVertexId().getBytes(), tmpMsg, null);
+ if(tmpVertexState != State.FINAL_VERTEX){
+ setVertexValue(tmpVal);
+ tmpMsg = new LogAlgorithmMessageWritable();
+ tmpMsg.setNeighberInfo(tmpVertexValue);
+ sendMsg(getVertexId(),tmpMsg);
+ //test
+ GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+ getVertexId().getBytes(), tmpMsg, null);
+ }
}
- if(tmpVertexState == State.END_VERTEX)
+ if(tmpVertexState == State.END_VERTEX){
voteToHalt();
+ //test
+ GraphVertexOperation.testVoteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(),
+ " it is the rear!");
+ }
+ if(tmpVertexState == State.FINAL_VERTEX){
+ voteToHalt();
+ try {
+ GraphVertexOperation.flushChainToFile(tmpVal.getMergeChain(),
+ tmpVal.getLengthOfMergeChain(),getVertexId().getBytes());
+ writer.write("Step: " + getSuperstep() + "\r\n");
+ writer.write("Flush! " + "\r\n");
+ } catch (IOException e) { e.printStackTrace(); }
+ }
}
}
try {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
index 891fb8d..e2f7b19 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
@@ -372,8 +372,8 @@
//createTestDat();
/* Path dir = new Path("data/webmap");
Path inFile = new Path(dir, "part-1");
- Path outFile = new Path(dir, "part-1-out");
- generateNumOfLinesFromBigFile(inFile,outFile,10000);*/
+ Path outFile = new Path(dir, "part-1-out-100");
+ generateNumOfLinesFromBigFile(inFile,outFile,100);*/
/**
* AGC - A C - TAT
* AGCATGCTAT
@@ -406,13 +406,15 @@
*
* Two strings
* "AGCATGCTAT","TTCAGTACCCGC"
+ *
+ * AGCATGCTAT
*/
- generateSequenceFileFromGeneCode3("AGCATGCTAT");//GTCGATT //before T: GGACG
+ generateSequenceFileFromGeneCode3("AGCATGGCCTGCTAT");//GTCGATT //before T: GGACG
}
public static void generateSequenceFileFromGeneCode3(String s) throws IOException{
Configuration conf = new Configuration();
- Path outFile = new Path(outDir, "sequenceShortFileMergeTest");
+ Path outFile = new Path(outDir, "11");//sequenceShortFileMergeTest
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, BytesWritable.class, KmerCountValue.class,
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
index 3d97044..5ccaae5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
@@ -44,16 +44,16 @@
*/
public class TestLoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
- private byte[] tmpVertexId;
- private BytesWritable vid;
- private TestLoadGraphVertex newVertex;
- private MessageWritable tmpMsg = new MessageWritable();
+ //private byte[] tmpVertexId;
+ //private BytesWritable vid;
+ //private TestLoadGraphVertex newVertex;
+ //private MessageWritable tmpMsg = new MessageWritable();
/**
* For test, just output original file
*/
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
- deleteVertex(getVertexId());
+ /*deleteVertex(getVertexId());
tmpVertexId = getVertexId().getBytes();
String a1 = "100100";
byte[] b1 = BitwiseOperation.convertBinaryStringToBytes(a1);
@@ -62,15 +62,15 @@
String valueString = "00000000";
byte value = BitwiseOperation.convertBinaryStringToByte(valueString);
if(getSuperstep() == 1 && Arrays.equals(b1,tmpVertexId)){
- /*newVertex = new TestLoadGraphVertex();
+ newVertex = new TestLoadGraphVertex();
vid.set(new BytesWritable(b2));
newVertex.setVertexId(vid);
newVertex.setVertexValue(getVertexValue());
- addVertex(vid, newVertex);*/
+ addVertex(vid, newVertex);
//vertex.initialize(new BytesWritable(b2), new ByteWritable(value), null, null);
//addVertex(new BytesWritable(b2),this.createdNewLiveVertex());
deleteVertex(getVertexId());
- }
+ }*/
/*String a2 = "100111";
byte[] b2 = BitwiseOperation.convertBinaryStringToBytes(a2);
String a3 = "11111111";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
index a89add1..1900eca 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.pregelix.type;
import java.io.DataInput;
@@ -20,22 +34,6 @@
count = 0;
}
- public byte getAdjBitMap() {
- return adjBitMap;
- }
-
- public void setAdjBitMap(byte adjBitMap) {
- this.adjBitMap = adjBitMap;
- }
-
- public byte getCount() {
- return count;
- }
-
- public void setCount(byte count) {
- this.count = count;
- }
-
@Override
public void readFields(DataInput arg0) throws IOException {
adjBitMap = arg0.readByte();
@@ -57,5 +55,19 @@
this.adjBitMap = bitmap;
this.count = count;
}
+ public byte getAdjBitMap() {
+ return adjBitMap;
+ }
+
+ public void setAdjBitMap(byte adjBitMap) {
+ this.adjBitMap = adjBitMap;
+ }
+ public void setCount(byte count) {
+ this.count = count;
+ }
+
+ public byte getCount() {
+ return count;
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
index 61ee2fa..c62688e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
@@ -6,4 +6,5 @@
public static final int END_VERTEX = 2;
public static final int MID_VERTEX = 3;
public static final int TODELETE = 4;
+ public static final int FINAL_VERTEX = 5;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
index 7ac4093..7affcbf 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -47,7 +47,7 @@
private static void generateBinaryLoadGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(MergeGraphVertex.class);
+ job.setVertexClass(TestLoadGraphVertex.class);
job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
@@ -84,8 +84,8 @@
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
- genLogAlgorithmForMergeGraph();
- //genBinaryLoadGraph();
+ //genLogAlgorithmForMergeGraph();
+ genBinaryLoadGraph();
//genSequenceLoadGraph();
//genBasicBinaryLoadGraph();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
index f347501..58e4ea3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/webmap/sequenceShortFileMergeTest";//sequenceFileMergeTest
+ private static final String DATA_PATH = "data/webmap/part-1-out";//sequenceFileMergeTest
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";