change code architect and use VKmerBytesWritable instead of byte[]
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
deleted file mode 100644
index 553de45..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import org.apache.hadoop.io.BytesWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
-
-public class GraphVertexOperation {
-
- /**
- * generate the valid data(byte[]) from BytesWritable
- */
- public static byte[] generateValidDataFromBytesWritable(BytesWritable bw){
- byte[] wholeBytes = bw.getBytes();
- int validNum = bw.getLength();
- byte[] validBytes = new byte[validNum];
- for(int i = 0; i < validNum; i++)
- validBytes[i] = wholeBytes[i];
- return validBytes;
- }
- /**
- * Single Vertex: in-degree = out-degree = 1
- * @param vertexValue
- */
- public static boolean isPathVertex(byte value){
- if(KmerUtil.inDegree(value) == 1 && KmerUtil.outDegree(value) == 1)
- return true;
- return false;
- }
- /**
- * Head Vertex: out-degree > 0,
- * @param vertexValue
- */
- public static boolean isHeadVertex(byte value){
- if(KmerUtil.outDegree(value) > 0 && !isPathVertex(value))
- return true;
- return false;
- }
- /**
- * Rear Vertex: in-degree > 0,
- * @param vertexValue
- */
- public static boolean isRearVertex(byte value){
- if(KmerUtil.inDegree(value) > 0 && !isPathVertex(value))
- return true;
- return false;
- }
- /**
- * update right neighber based on next vertexId
- */
- public static byte updateRightNeighberByVertexId(byte oldVertexValue, byte[] neighberVertexId, int k){
-
- String neighberVertex = Kmer.recoverKmerFrom(k, neighberVertexId, 0, neighberVertexId.length);
-
- byte newBit = Kmer.GENE_CODE.getAdjBit((byte)neighberVertex.charAt(neighberVertex.length() - 1));
- return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newBit & 0x0F));
- }
- /**
- * update right neighber
- */
- public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue){
- return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
deleted file mode 100644
index 580e1fe..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class LoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
-
- /**
- * For test, just output original file
- */
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- deleteVertex(getVertexId());
- voteToHalt();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- //final int k = Integer.parseInt(args[0]);
- PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
- job.setVertexClass(LoadGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
deleted file mode 100644
index 88c5c35..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ /dev/null
@@ -1,309 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ValueStateWritable
- * edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class LogAlgorithmForMergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
- public static final String KMER_SIZE = "LogAlgorithmForMergeGraphVertex.kmerSize";
- public static final String ITERATIONS = "MergeGraphVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private byte[] tmpVertexId;
- private byte[] tmpDestVertexId;
- private BytesWritable destVertexId = new BytesWritable();
- private byte[] mergeChainVertexId;
- private int lengthOfMergeChainVertex;
- private byte tmpVertexValue;
- private ValueStateWritable tmpVal = new ValueStateWritable();
- private LogAlgorithmMessageWritable tmpMsg = new LogAlgorithmMessageWritable();
- /**
- * Log Algorithm for path merge graph
- */
- /**
- * Load KmerSize, MaxIteration
- */
- @Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
- tmpVal = getVertexValue();
- if (getSuperstep() == 1) {
- tmpMsg.setChainVertexId(new byte[0]);
- if(GraphVertexOperation.isHeadVertex(tmpVal.getValue())){
- tmpMsg.setMessage(Message.START);
- for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
- if((tmpVal.getValue() & (1 << x)) != 0){
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- voteToHalt();
- }
- if(GraphVertexOperation.isRearVertex(tmpVal.getValue())){
- tmpMsg.setMessage(Message.END);
-
- for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
- if(((tmpVal.getValue()>> 4) & (1 << x)) != 0){
- tmpDestVertexId = KmerUtil.shiftKmerWithPreCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- voteToHalt();
- }
- if(GraphVertexOperation.isPathVertex(tmpVal.getValue())){
- tmpVal.setState(State.MID_VERTEX);
- setVertexValue(tmpVal);
- }
- /*if(!GraphVertexOperation.isHeadVertex(tmpVal.getValue())
- && !GraphVertexOperation.isRearVertex(tmpVal.getValue())
- && !GraphVertexOperation.isRearVertex(tmpVal.getValue()))
- voteToHalt();*/
- }
- else if(getSuperstep() == 2 && getSuperstep() <= maxIteration){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(tmpVal.getValue())){
- msgIterator.next();
- voteToHalt();
- }
- else{
- tmpMsg = msgIterator.next();
- if(tmpMsg.getMessage() == Message.START &&
- (tmpVal.getState() == State.MID_VERTEX || tmpVal.getState() == State.END_VERTEX)){
- tmpVal.setState(State.START_VERTEX);
- setVertexValue(tmpVal);
- }
- else if(tmpMsg.getMessage() == Message.END && tmpVal.getState() == State.MID_VERTEX){
- tmpVal.setState(State.END_VERTEX);
- setVertexValue(tmpVal);
- voteToHalt();
- }
- else
- voteToHalt();
- }
- }
- }
- //head node sends message to path node
- else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
- if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
- voteToHalt();
- else{
- if(getSuperstep() == 3){
- tmpMsg = new LogAlgorithmMessageWritable();
- if(Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)) == -1)
- voteToHalt();
- else{
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId,
- 0, tmpVertexId.length,
- Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)));
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- if(tmpVal.getState() == State.START_VERTEX){
- tmpMsg.setMessage(Message.START);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId, tmpMsg);
- voteToHalt();
- }
- else if(tmpVal.getState() != State.END_VERTEX && tmpVal.getState() != State.FINAL_DELETE){
- tmpMsg.setMessage(Message.NON);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId,tmpMsg);
- voteToHalt();
- }
- }
- }
- else{
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- byte[] lastKmer = KmerUtil.getLastKmerFromChain(kmerSize,
- tmpVal.getLengthOfMergeChain(),
- tmpVal.getMergeChain(),
- 0, tmpVal.getMergeChain().length);
- if(Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)) == -1 || lastKmer == null)
- voteToHalt();
- else{
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, lastKmer,
- 0, lastKmer.length,
- Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)));
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- if(tmpVal.getState() == State.START_VERTEX){
- tmpMsg.setMessage(Message.START);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId, tmpMsg);
- voteToHalt();
- }
- else if(tmpVal.getState() != State.END_VERTEX && tmpVal.getState() != State.FINAL_DELETE){
- tmpMsg.setMessage(Message.NON);
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- }
- }
- }
-
- //path node sends message back to head node
- else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
- if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
- voteToHalt();
- else{
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- int message = tmpMsg.getMessage();
- if(tmpVal.getLengthOfMergeChain() == 0){
- tmpVal.setLengthOfMergeChain(kmerSize);
- tmpVal.setMergeChain(tmpVertexId);
- setVertexValue(tmpVal);
- }
- tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
- tmpMsg.setChainVertexId(tmpVal.getMergeChain());
-
- tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
- tmpMsg.setSourceVertexState(tmpVal.getState());
-
- //kill Message because it has been merged by the head
- if(tmpVal.getState() == State.END_VERTEX || tmpVal.getState() == State.FINAL_DELETE){
- tmpMsg.setMessage(Message.END);
- tmpVal.setState(State.FINAL_DELETE);
- setVertexValue(tmpVal);
- //deleteVertex(getVertexId());
- }
- else
- tmpMsg.setMessage(Message.NON);
-
- if(message == Message.START){
- tmpVal.setState(State.TODELETE);
- setVertexValue(tmpVal);
- }
- destVertexId.set(tmpMsg.getSourceVertexId(), 0, tmpMsg.getSourceVertexId().length);
- sendMsg(destVertexId,tmpMsg);
- //voteToHalt();
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX //&& getVertexValue().getState() != State.NON_EXIST
- && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
- tmpVal.setState(State.KILL_SELF);
- setVertexValue(tmpVal);
- voteToHalt();
- //deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
- }
- }
- }
- }
- else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
- if(tmpVal.getState() == State.TODELETE || tmpVal.getState() == State.KILL_SELF)
- voteToHalt(); //deleteVertex(getVertexId()); //killSelf
- else{
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
-
- if(tmpMsg.getMessage() == Message.END){
- if(tmpVal.getState() != State.START_VERTEX)
- tmpVal.setState(State.END_VERTEX);
- else
- tmpVal.setState(State.FINAL_VERTEX);
- }
-
- if(getSuperstep() == 5){
- lengthOfMergeChainVertex = kmerSize;
- mergeChainVertexId = tmpVertexId;
- }
- else{
- lengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain();
- mergeChainVertexId = tmpVal.getMergeChain();
- }
- byte[] tmplastKmer = KmerUtil.getLastKmerFromChain(tmpMsg.getLengthOfChain() - kmerSize + 1,
- tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId(),0, tmpMsg.getChainVertexId().length);
- mergeChainVertexId = KmerUtil.mergeTwoKmer(lengthOfMergeChainVertex,
- mergeChainVertexId,
- 0, mergeChainVertexId.length,
- tmpMsg.getLengthOfChain() - kmerSize + 1,
- tmplastKmer, 0, tmplastKmer.length);
- lengthOfMergeChainVertex = lengthOfMergeChainVertex + tmpMsg.getLengthOfChain()
- - kmerSize + 1;
- tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
- tmpVal.setMergeChain(mergeChainVertexId);
-
- tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpMsg.getNeighberInfo());
- tmpVal.setValue(tmpVertexValue);
- if(tmpMsg.getMessage() != Message.END){
- setVertexValue(tmpVal);
- tmpMsg = new LogAlgorithmMessageWritable(); //reset
- tmpMsg.setNeighberInfo(tmpVertexValue);
- sendMsg(getVertexId(),tmpMsg);
- }
- }
- if(tmpVal.getState() == State.END_VERTEX || tmpVal.getState() == State.FINAL_DELETE)
- voteToHalt();
- if(tmpVal.getState() == State.FINAL_VERTEX){
- //String source = Kmer.recoverKmerFrom(tmpVal.getLengthOfMergeChain(), tmpVal.getMergeChain(), 0, tmpVal.getMergeChain().length);
- voteToHalt();
- }
- }
- }
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(LogAlgorithmForMergeGraphVertex.class.getSimpleName());
- job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
deleted file mode 100644
index ec45019..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
-import edu.uci.ics.genomix.type.old.KmerUtil;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class MergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
-
- public static final String KMER_SIZE = "MergeGraphVertex.kmerSize";
- public static final String ITERATIONS = "MergeGraphVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private byte[] tmpVertexId;
- private byte[] tmpDestVertexId;
- private BytesWritable destVertexId = new BytesWritable();
- private BytesWritable tmpChainVertexId = new BytesWritable();
- private ValueStateWritable tmpVertexValue = new ValueStateWritable();
- private MessageWritable tmpMsg = new MessageWritable();
- /**
- * Naive Algorithm for path merge graph
- * @throws Exception
- * @throws
- */
- /**
- * Load KmerSize, MaxIteration
- */
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
- if (getSuperstep() == 1) {
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getValue())){
- tmpMsg.setSourceVertexId(tmpVertexId);
- tmpMsg.setHead(tmpVertexId);
- tmpMsg.setLengthOfChain(0);
- tmpMsg.setChainVertexId(tmpChainVertexId.getBytes());
- for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
- if((getVertexValue().getValue() & (1 << x)) != 0){
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, tmpVertexId, 0, tmpVertexId.length, x);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- }
-
- //path node sends message back to head node
- else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
-
- if(!tmpMsg.isRear()){
- if(getSuperstep() == 2)
- tmpMsg.setHead(tmpVertexId);
- if(GraphVertexOperation.isPathVertex(getVertexValue().getValue())){
- tmpDestVertexId = tmpMsg.getSourceVertexId();
- tmpMsg.setNeighberInfo(getVertexValue().getValue()); //set neighber
- if(tmpMsg.getLengthOfChain() == 0){
- tmpMsg.setLengthOfChain(kmerSize);
- tmpMsg.setChainVertexId(tmpVertexId);
- }
- else{
- String source = Kmer.recoverKmerFrom(kmerSize, tmpVertexId, 0, tmpVertexId.length);
- tmpMsg.setChainVertexId(KmerUtil.mergeKmerWithNextCode(
- tmpMsg.getLengthOfChain(),
- tmpMsg.getChainVertexId(),
- 0, tmpMsg.getChainVertexId().length,
- Kmer.GENE_CODE.getCodeFromSymbol((byte)source.charAt(source.length() - 1))));
- tmpMsg.incrementLength();
- deleteVertex(getVertexId());
- }
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- else if(GraphVertexOperation.isRearVertex(getVertexValue().getValue())){
- if(getSuperstep() == 2)
- voteToHalt();
- else{
- tmpDestVertexId = tmpMsg.getSourceVertexId();
- tmpMsg.setSourceVertexId(tmpVertexId);
- tmpMsg.setRear(true);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- else{
- tmpVertexValue.setState(State.START_VERTEX);
- tmpVertexValue.setValue(GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getValue(),
- tmpMsg.getSourceVertexId(), kmerSize));
- tmpVertexValue.setLengthOfMergeChain(tmpMsg.getLengthOfChain());
- tmpVertexValue.setMergeChain(tmpMsg.getChainVertexId());
- setVertexValue(tmpVertexValue);
- //String source = Kmer.recoverKmerFrom(tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId(), 0, tmpMsg.getChainVertexId().length);
- //System.out.print("");
- /*try {
-
- GraphVertexOperation.flushChainToFile(tmpMsg.getChainVertexId(),
- tmpMsg.getLengthOfChain(),tmpVertexId);
- } catch (IOException e) { e.printStackTrace(); }*/
- }
- }
- }
- //head node sends message to path node
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- while (msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- if(!tmpMsg.isRear()){
- byte[] lastKmer = KmerUtil.getLastKmerFromChain(kmerSize,
- tmpMsg.getLengthOfChain(),
- tmpMsg.getChainVertexId(),
- 0, tmpMsg.getChainVertexId().length);
- tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(kmerSize, lastKmer,
- 0, lastKmer.length,
- Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpMsg.getNeighberInfo() & 0x0F)));
-
- tmpMsg.setSourceVertexId(tmpVertexId);
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- else{
- tmpDestVertexId = tmpMsg.getHead();
- destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
- sendMsg(destVertexId,tmpMsg);
- }
- }
- }
- voteToHalt();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
- job.setVertexClass(MergeGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
index 823a984..e1868b1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -3,7 +3,6 @@
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -14,12 +13,14 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
public class BinaryVertexInputFormat <I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M>{
/** Uses the SequenceFileInputFormat to do everything */
+ @SuppressWarnings("rawtypes")
protected SequenceFileInputFormat binaryInputFormat = new SequenceFileInputFormat();
/**
@@ -37,7 +38,7 @@
public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<BytesWritable,KmerCountValue> lineRecordReader;
+ private final RecordReader<KmerBytesWritable,KmerCountValue> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -47,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryVertexReader(RecordReader<BytesWritable, KmerCountValue> recordReader) {
+ public BinaryVertexReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -73,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<BytesWritable,KmerCountValue> getRecordReader() {
+ protected RecordReader<KmerBytesWritable,KmerCountValue> getRecordReader() {
return lineRecordReader;
}
@@ -87,7 +88,8 @@
}
}
- @Override
+ @SuppressWarnings("unchecked")
+ @Override
public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
// Ignore the hint of numWorkers here since we are using SequenceFileInputFormat
// to do this for us
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
index f497f21..1435770 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -2,7 +2,6 @@
import java.io.IOException;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
@@ -12,6 +11,7 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter;
+ private final RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<BytesWritable, ValueStateWritable> getRecordWriter() {
+ public RecordWriter<KmerBytesWritable, ValueStateWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 2d70fdf..60342a7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,8 +10,8 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -60,11 +60,11 @@
for (int i = 1; i < inputs.length; i++)
FileInputFormat.addInputPaths(job, inputs[0]);
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- job.getConfiguration().setInt(MergeGraphVertex.KMER_SIZE, options.sizeKmer);
- job.getConfiguration().setInt(LogAlgorithmForMergeGraphVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
if (options.numIteration > 0){
- job.getConfiguration().setLong(MergeGraphVertex.ITERATIONS, options.numIteration);
- //job.getConfiguration().setLong(LogAlgorithmForMergeGraphVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setLong(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setLong(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
}
return options;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
deleted file mode 100644
index 4cd22ac..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-import java.util.logging.FileHandler;
-import java.util.logging.Logger;
-
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueWritable;
-import edu.uci.ics.genomix.pregelix.log.DataLoadLogFormatter;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
-
-public class BinaryLoadGraphInputFormat extends
- BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
- /**
- * Format INPUT
- */
- @Override
- public VertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
- }
-}
-
-@SuppressWarnings("rawtypes")
-class BinaryLoadGraphReader extends
- BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> {
- private Vertex vertex;
- private BytesWritable vertexId = new BytesWritable();
- private ValueStateWritable vertexValue = new ValueStateWritable();
-
- public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
- super(recordReader);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
- InterruptedException {
- if (vertex == null)
- vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
-
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
-
- vertex.reset();
- if(getRecordReader() != null){
- /**
- * set the src vertex id
- */
- vertexId.set(getRecordReader().getCurrentKey());
- vertex.setVertexId(vertexId);
- /**
- * set the vertex value
- */
- KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setValue(kmerCountValue.getAdjBitMap());
- vertex.setVertexValue(vertexValue);
- }
-
- return vertex;
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
deleted file mode 100644
index 2b87379..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-
-public class BinaryLoadGraphOutputFormat extends
- BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
-
- @Override
- public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
- return new BinaryLoadGraphVertexWriter(recordWriter);
- }
-
- /**
- * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
- */
- public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
- InterruptedException {
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
deleted file mode 100644
index 0e74c2d..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-
-public class LogAlgorithmForMergeGraphInputFormat extends
- BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
-
- /**
- * Format INPUT
- */
- @Override
- public VertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
- }
-
- @SuppressWarnings("rawtypes")
- class BinaryLoadGraphReader extends
- BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
- private Vertex vertex;
- private BytesWritable vertexId = new BytesWritable();
- private ValueStateWritable vertexValue = new ValueStateWritable();
-
- public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
- super(recordReader);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
- InterruptedException {
- if (vertex == null)
- vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
-
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
-
-
- if(getRecordReader() != null){
- /**
- * set the src vertex id
- */
-
- vertexId = getRecordReader().getCurrentKey();
- vertex.setVertexId(vertexId);
- /**
- * set the vertex value
- */
- KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setValue(kmerCountValue.getAdjBitMap());
- vertexValue.setState(State.NON_VERTEX);
- vertex.setVertexValue(vertexValue);
- }
-
- return vertex;
- }
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
deleted file mode 100644
index 865a787..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package edu.uci.ics.genomix.pregelix.format;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
-
-public class LogAlgorithmForMergeGraphOutputFormat extends
- BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
-
-
- @Override
- public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
- return new BinaryLoadGraphVertexWriter(recordWriter);
- }
-
- /**
- * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
- */
- public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
-
- public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
- InterruptedException {
- if(vertex.getVertexValue().getState() != State.FINAL_DELETE
- && vertex.getVertexValue().getState() != State.END_VERTEX
- && vertex.getVertexValue().getState() != State.TODELETE
- && vertex.getVertexValue().getState() != State.KILL_SELF)
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index 3d07c5b..ac84d8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -2,11 +2,12 @@
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.File;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
+
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable>{
/**
@@ -15,65 +16,55 @@
* chainVertexId stores the chains of connected DNA
* file stores the point to the file that stores the chains of connected DNA
*/
- private byte[] sourceVertexId;
- private byte neighberInfo;
- private int lengthOfChain;
- private byte[] chainVertexId;
- private File file;
+ private VKmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable chainVertexId;
+ private byte adjMap;
private int message;
private int sourceVertexState;
public LogAlgorithmMessageWritable(){
- sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
+ sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
}
- public void set(byte[] sourceVertexId,byte neighberInfo, byte[] chainVertexId, File file){
- this.sourceVertexId = sourceVertexId;
- this.chainVertexId = chainVertexId;
- this.file = file;
- this.message = 0;
- this.lengthOfChain = 0;
+ public void set(VKmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, int message, int sourceVertexState){
+ this.sourceVertexId.set(sourceVertexId);
+ this.chainVertexId.set(chainVertexId);
+ this.adjMap = adjMap;
+ this.message = message;
+ this.sourceVertexState = sourceVertexState;
}
public void reset(){
- sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
- neighberInfo = (Byte) null;
- lengthOfChain = 0;
- chainVertexId = null;
+ //sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ adjMap = (byte)0;
message = 0;
sourceVertexState = 0;
}
- public byte[] getSourceVertexId() {
+ public VKmerBytesWritable getSourceVertexId() {
return sourceVertexId;
}
- public void setSourceVertexId(byte[] sourceVertexId) {
- this.sourceVertexId = sourceVertexId;
+ public void setSourceVertexId(VKmerBytesWritable sourceVertexId) {
+ this.sourceVertexId.set(sourceVertexId);
}
- public byte getNeighberInfo() {
- return neighberInfo;
+ public byte getAdjMap() {
+ return adjMap;
}
- public void setNeighberInfo(byte neighberInfo) {
- this.neighberInfo = neighberInfo;
+ public void setAdjMap(byte adjMap) {
+ this.adjMap = adjMap;
}
- public byte[] getChainVertexId() {
+ public VKmerBytesWritable getChainVertexId() {
return chainVertexId;
}
- public void setChainVertexId(byte[] chainVertexId) {
- this.chainVertexId = chainVertexId;
- }
-
- public File getFile() {
- return file;
- }
-
- public void setFile(File file) {
- this.file = file;
+ public void setChainVertexId(VKmerBytesWritable chainVertexId) {
+ this.chainVertexId.set(chainVertexId);
}
public int getMessage() {
@@ -93,84 +84,48 @@
}
public int getLengthOfChain() {
- return lengthOfChain;
- }
-
- public void setLengthOfChain(int lengthOfChain) {
- this.lengthOfChain = lengthOfChain;
- }
-
- public void incrementLength(){
- this.lengthOfChain++;
+ return chainVertexId.getKmerLength();
}
@Override
public void write(DataOutput out) throws IOException {
- // TODO Auto-generated method stub
- out.writeInt(lengthOfChain);
- if(lengthOfChain != 0)
- out.write(chainVertexId);
-
+ sourceVertexId.write(out);
+ chainVertexId.write(out);
+ out.write(adjMap);
out.writeInt(message);
out.writeInt(sourceVertexState);
-
- out.write(sourceVertexId);
- out.write(neighberInfo);
}
@Override
public void readFields(DataInput in) throws IOException {
- // TODO Auto-generated method stub
- lengthOfChain = in.readInt();
- if(lengthOfChain > 0){
- chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
- in.readFully(chainVertexId);
- }
- else
- chainVertexId = new byte[0];
-
+ sourceVertexId.readFields(in);
+ chainVertexId.readFields(in);
+ adjMap = in.readByte();
message = in.readInt();
sourceVertexState = in.readInt();
-
- sourceVertexId = new byte[(LogAlgorithmForMergeGraphVertex.kmerSize-1)/4 + 1];
- in.readFully(sourceVertexId);
- neighberInfo = in.readByte();
}
- @Override
+ @Override
public int hashCode() {
- int hashCode = 0;
- for(int i = 0; i < chainVertexId.length; i++)
- hashCode = (int)chainVertexId[i];
- return hashCode;
+ return chainVertexId.hashCode();
}
+
@Override
public boolean equals(Object o) {
- if (o instanceof LogAlgorithmMessageWritable) {
+ if (o instanceof NaiveAlgorithmMessageWritable) {
LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
- return chainVertexId == tp.chainVertexId && file == tp.file;
+ return chainVertexId.equals(tp.chainVertexId);
}
return false;
}
+
@Override
public String toString() {
- return chainVertexId.toString() + "\t" + file.getAbsolutePath();
+ return chainVertexId.toString();
}
- @Override
+
+ @Override
public int compareTo(LogAlgorithmMessageWritable tp) {
- // TODO Auto-generated method stub
- int cmp;
- if (chainVertexId == tp.chainVertexId)
- cmp = 0;
- else
- cmp = 1;
- if (cmp != 0)
- return cmp;
- if (file == tp.file)
- return 0;
- else
- return 1;
+ return chainVertexId.compareTo(tp.chainVertexId);
}
-
-
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
deleted file mode 100644
index 3872514..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
-
-public class MessageWritable implements WritableComparable<MessageWritable>{
- /**
- * sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA
- * file stores the point to the file that stores the chains of connected DNA
- */
- private byte[] sourceVertexId;
- private byte neighberInfo;
- private byte[] chainVertexId;
- private File file;
- private boolean isRear;
- private int lengthOfChain;
- private byte[] head;
-
- public MessageWritable(){
- }
-
- public void set(byte[] sourceVertexId, byte neighberInfo, byte[] chainVertexId, File file, byte[] head){
- this.sourceVertexId = sourceVertexId;
- this.neighberInfo = neighberInfo;
- this.chainVertexId = chainVertexId;
- this.file = file;
- this.isRear = false;
- this.lengthOfChain = 0;
- this.head = head;
- }
-
- public byte[] getSourceVertexId() {
- return sourceVertexId;
- }
-
- public void setSourceVertexId(byte[] sourceVertexId) {
- this.sourceVertexId = sourceVertexId;
- }
-
- public byte getNeighberInfo() {
- return neighberInfo;
- }
-
- public void setNeighberInfo(byte neighberInfo) {
- this.neighberInfo = neighberInfo;
- }
-
- public byte[] getChainVertexId() {
- return chainVertexId;
- }
-
- public void setChainVertexId(byte[] chainVertexId) {
- this.chainVertexId = chainVertexId;
- }
-
- public File getFile() {
- return file;
- }
-
- public void setFile(File file) {
- this.file = file;
- }
-
- public boolean isRear() {
- return isRear;
- }
-
- public void setRear(boolean isRear) {
- this.isRear = isRear;
- }
-
- public int getLengthOfChain() {
- return lengthOfChain;
- }
-
- public void setLengthOfChain(int lengthOfChain) {
- this.lengthOfChain = lengthOfChain;
- }
-
-
- public byte[] getHead() {
- return head;
- }
-
- public void setHead(byte[] head) {
- this.head = head;
- }
-
- public void incrementLength(){
- this.lengthOfChain++;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // TODO Auto-generated method stub
- out.writeInt(lengthOfChain);
- if(lengthOfChain != 0)
- out.write(chainVertexId);
- out.write(sourceVertexId);
- out.write(head);
- out.write(neighberInfo);
- out.writeBoolean(isRear);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // TODO Auto-generated method stub
- lengthOfChain = in.readInt();
- if(lengthOfChain > 0){
- chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
- in.readFully(chainVertexId);
- }
- else
- chainVertexId = new byte[0];
- sourceVertexId = new byte[(MergeGraphVertex.kmerSize-1)/4 + 1];
- in.readFully(sourceVertexId);
- head = new byte[(MergeGraphVertex.kmerSize-1)/4 + 1];
- in.readFully(head);
- neighberInfo = in.readByte();
- isRear = in.readBoolean();
-
- }
-
- @Override
- public int hashCode() {
- int hashCode = 0;
- for(int i = 0; i < chainVertexId.length; i++)
- hashCode = (int)chainVertexId[i];
- return hashCode;
- }
- @Override
- public boolean equals(Object o) {
- if (o instanceof MessageWritable) {
- MessageWritable tp = (MessageWritable) o;
- return chainVertexId == tp.chainVertexId && file == tp.file;
- }
- return false;
- }
- @Override
- public String toString() {
- return chainVertexId.toString() + "\t" + file.getAbsolutePath();
- }
-
- @Override
- public int compareTo(MessageWritable tp) {
- // TODO Auto-generated method stub
- int cmp;
- if (chainVertexId == tp.chainVertexId)
- cmp = 0;
- else
- cmp = 1;
- if (cmp != 0)
- return cmp;
- if (file == tp.file)
- return 0;
- else
- return 1;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index ffc1d38..769277c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -4,35 +4,42 @@
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
- private byte value;
+ private byte adjMap;
private int state;
- private int lengthOfMergeChain;
- private byte[] mergeChain;
+ private VKmerBytesWritable mergeChain;
public ValueStateWritable() {
state = State.NON_VERTEX;
- lengthOfMergeChain = 0;
+ mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
}
- public ValueStateWritable(byte value, int state, int lengthOfMergeChain, byte[] mergeChain) {
- this.value = value;
+ public ValueStateWritable(byte adjMap, int state, VKmerBytesWritable mergeChain) {
+ this.adjMap = adjMap;
this.state = state;
- this.lengthOfMergeChain = lengthOfMergeChain;
- this.mergeChain = mergeChain;
+ this.mergeChain.set(mergeChain);
+ }
+
+ public void set(byte adjMap, int state, VKmerBytesWritable mergeChain){
+ this.adjMap = adjMap;
+ this.state = state;
+ this.mergeChain.set(mergeChain);
}
- public byte getValue() {
- return value;
+ public byte getAdjMap() {
+ return adjMap;
}
- public void setValue(byte value) {
- this.value = value;
+ public void setAdjMap(byte adjMap) {
+ this.adjMap = adjMap;
}
public int getState() {
@@ -44,43 +51,33 @@
}
public int getLengthOfMergeChain() {
- return lengthOfMergeChain;
+ return mergeChain.getKmerLength();
}
- public void setLengthOfMergeChain(int lengthOfMergeChain) {
- this.lengthOfMergeChain = lengthOfMergeChain;
- }
-
- public byte[] getMergeChain() {
+ public VKmerBytesWritable getMergeChain() {
return mergeChain;
}
- public void setMergeChain(byte[] mergeChain) {
- this.mergeChain = mergeChain;
+ public void setMergeChain(KmerBytesWritable mergeChain) {
+ this.mergeChain.set(mergeChain);
+ }
+
+ public void setMergeChain(VKmerBytesWritable mergeChain) {
+ this.mergeChain.set(mergeChain);
}
@Override
public void readFields(DataInput in) throws IOException {
- value = in.readByte();
+ adjMap = in.readByte();
state = in.readInt();
- lengthOfMergeChain = in.readInt();
- if(lengthOfMergeChain < 0)
- System.out.println();
- if(lengthOfMergeChain != 0){
- mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
- in.readFully(mergeChain);
- }
- else
- mergeChain = new byte[0];
+ mergeChain.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeByte(value);
+ out.writeByte(adjMap);
out.writeInt(state);
- out.writeInt(lengthOfMergeChain);
- if(lengthOfMergeChain != 0)
- out.write(mergeChain);
+ mergeChain.write(out);
}
@Override
@@ -91,11 +88,11 @@
@Override
public String toString() {
- if(lengthOfMergeChain == 0)
- return Kmer.GENE_CODE.getSymbolFromBitMap(value);
- return Kmer.GENE_CODE.getSymbolFromBitMap(value) + "\t" +
- lengthOfMergeChain + "\t" +
- Kmer.recoverKmerFrom(lengthOfMergeChain, mergeChain, 0, mergeChain.length) + "\t" +
+ if(mergeChain.getKmerLength() == 0)
+ return GeneCode.getSymbolFromBitMap(adjMap);
+ return GeneCode.getSymbolFromBitMap(adjMap) + "\t" +
+ getLengthOfMergeChain() + "\t" +
+ mergeChain.toString() + "\t" +
state;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
deleted file mode 100644
index a3f0b9f..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.*;
-
-import org.apache.hadoop.io.WritableComparable;
-
-public class ValueWritable implements WritableComparable<ValueWritable> {
-
- private byte value;
- private int lengthOfMergeChain;
- private byte[] mergeChain;
-
- public ValueWritable() {
- lengthOfMergeChain = 0;
- }
-
- public ValueWritable(byte value, int lengthOfMergeChain, byte[] mergeChain) {
- this.value = value;
- this.lengthOfMergeChain = lengthOfMergeChain;
- this.mergeChain = mergeChain;
- }
-
- public byte getValue() {
- return value;
- }
-
- public void setValue(byte value) {
- this.value = value;
- }
-
- public int getLengthOfMergeChain() {
- return lengthOfMergeChain;
- }
-
- public void setLengthOfMergeChain(int lengthOfMergeChain) {
- this.lengthOfMergeChain = lengthOfMergeChain;
- }
-
- public byte[] getMergeChain() {
- return mergeChain;
- }
-
- public void setMergeChain(byte[] mergeChain) {
- this.mergeChain = mergeChain;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- value = in.readByte();
- lengthOfMergeChain = in.readInt();
- if(lengthOfMergeChain != 0){
- mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
- in.readFully(mergeChain);
- }
- else
- mergeChain = new byte[0];
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(value);
- out.writeInt(lengthOfMergeChain);
- if(lengthOfMergeChain != 0)
- out.write(mergeChain);
- }
-
- @Override
- public int compareTo(ValueWritable o) {
- // TODO Auto-generated method stub
- return 0;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
index 7e56a66..6105f18 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -4,27 +4,22 @@
import java.util.logging.Handler;
import java.util.logging.LogRecord;
-import org.apache.hadoop.io.BytesWritable;
-
-import edu.uci.ics.genomix.type.old.Kmer;
import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class DataLoadLogFormatter extends Formatter{
- private BytesWritable key;
+ private VKmerBytesWritable key;
private KmerCountValue value;
- private int k;
- public void set(BytesWritable key,
- KmerCountValue value, int k){
- this.key = key;
+ public void set(VKmerBytesWritable key,
+ KmerCountValue value){
+ this.key.set(key);
this.value = value;
- this.k = k;
}
public String format(LogRecord record) {
- StringBuilder builder = new StringBuilder(1000);
+ StringBuilder builder = new StringBuilder(1000);
- builder.append(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
- key.getLength())
+ builder.append(key.toString()
+ "\t" + value.toString() + "\r\n");
if(!formatMessage(record).equals(""))
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 3af9b6f..d4f03ee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -5,7 +5,7 @@
import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class LogAlgorithmLogFormatter extends Formatter {
//
@@ -13,13 +13,11 @@
//
//private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private byte[] sourceVertexId;
- private byte[] destVertexId;
- private LogAlgorithmMessageWritable msg;
+ private VKmerBytesWritable sourceVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
private int state;
- private int k;
- private byte[] mergeChain;
- private int lengthOfMergeChain;
+ private VKmerBytesWritable mergeChain = new VKmerBytesWritable(1);;
//private boolean testDelete = false;
/** 0: general operation
* 1: testDelete
@@ -27,60 +25,57 @@
* 3: testVoteToHalt
*/
private int operation;
+
+ public LogAlgorithmLogFormatter(){
+ }
- public void set(long step, byte[] sourceVertexId,
- byte[] destVertexId, LogAlgorithmMessageWritable msg, int state, int k){
+ public void set(long step, VKmerBytesWritable sourceVertexId,
+ VKmerBytesWritable destVertexId, LogAlgorithmMessageWritable msg, int state){
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.destVertexId = destVertexId;
+ this.sourceVertexId.set(sourceVertexId);
+ this.destVertexId.set(destVertexId);
this.msg = msg;
this.state = state;
- this.k = k;
this.operation = 0;
}
- public void setMergeChain(long step, byte[] sourceVertexId,
- int lengthOfMergeChain, byte[] mergeChain, int k){
+ public void setMergeChain(long step, VKmerBytesWritable sourceVertexId,
+ VKmerBytesWritable mergeChain){
this.reset();
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.lengthOfMergeChain = lengthOfMergeChain;
- this.mergeChain = mergeChain;
- this.k = k;
+ this.sourceVertexId.set(sourceVertexId);
+ this.mergeChain.set(mergeChain);
this.operation = 2;
}
- public void setVotoToHalt(long step, byte[] sourceVertexId, int k){
+ public void setVotoToHalt(long step, VKmerBytesWritable sourceVertexId){
this.reset();
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.k = k;
+ this.sourceVertexId.set(sourceVertexId);
this.operation = 3;
}
public void reset(){
- this.sourceVertexId = null;
- this.destVertexId = null;
- this.msg = null;
+ this.sourceVertexId = new VKmerBytesWritable(1);
+ this.destVertexId = new VKmerBytesWritable(1);
+ this.msg = new LogAlgorithmMessageWritable();
this.state = 0;
- this.k = 0;
- this.mergeChain = null;
- this.lengthOfMergeChain = 0;
+ this.mergeChain = new VKmerBytesWritable(1);
}
public String format(LogRecord record) {
StringBuilder builder = new StringBuilder(1000);
- String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+ String source = sourceVertexId.toString();
String chain = "";
builder.append("Step: " + step + "\r\n");
builder.append("Source Code: " + source + "\r\n");
if(operation == 0){
- if(destVertexId != null){
- String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+ if(destVertexId.getKmerLength() != -1){
+ String dest = destVertexId.toString();
builder.append("Send message to " + "\r\n");
builder.append("Destination Code: " + dest + "\r\n");
}
builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getMessage()) + "\r\n");
- if(msg.getLengthOfChain() != 0){
- chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+ if(msg.getLengthOfChain() != -1){
+ chain = msg.getChainVertexId().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
@@ -88,9 +83,9 @@
builder.append("State is: " + State.STATE_CONTENT.getContentFromCode(state) + "\r\n");
}
if(operation == 2){
- chain = Kmer.recoverKmerFrom(lengthOfMergeChain, mergeChain, 0, mergeChain.length);
+ chain = mergeChain.toString();
builder.append("Merge Chain: " + chain + "\r\n");
- builder.append("Merge Chain Length: " + lengthOfMergeChain + "\r\n");
+ builder.append("Merge Chain Length: " + mergeChain.getKmerLength() + "\r\n");
}
if(operation == 3)
builder.append("Vote to halt!");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
index c337a16..332d6d0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
@@ -2,8 +2,8 @@
import java.util.logging.*;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class NaiveAlgorithmLogFormatter extends Formatter {
//
@@ -11,22 +11,20 @@
//
//private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private byte[] sourceVertexId;
- private byte[] destVertexId;
- private MessageWritable msg;
- private int k;
+ private VKmerBytesWritable sourceVertexId;
+ private VKmerBytesWritable destVertexId;
+ private NaiveAlgorithmMessageWritable msg;
- public void set(long step, byte[] sourceVertexId,
- byte[] destVertexId, MessageWritable msg, int k){
+ public void set(long step, VKmerBytesWritable sourceVertexId,
+ VKmerBytesWritable destVertexId, NaiveAlgorithmMessageWritable msg){
this.step = step;
- this.sourceVertexId = sourceVertexId;
- this.destVertexId = destVertexId;
+ this.sourceVertexId.set(sourceVertexId);
+ this.destVertexId.set(destVertexId);
this.msg = msg;
- this.k = k;
}
public String format(LogRecord record) {
StringBuilder builder = new StringBuilder(1000);
- String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+ String source = sourceVertexId.toString();
String chain = "";
@@ -35,11 +33,11 @@
if(destVertexId != null){
builder.append("Send message to " + "\r\n");
- String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+ String dest = destVertexId.toString();
builder.append("Destination Code: " + dest + "\r\n");
}
if(msg.getLengthOfChain() != 0){
- chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+ chain = msg.getChainVertexId().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
index d9f0efe..c7349dd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
@@ -5,14 +5,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
-
public class CombineSequenceFile {
/**
@@ -25,24 +23,23 @@
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
- Path p = new Path("data/SinglePath_55");
- Path p2 = new Path("data/result");
- Path outFile = new Path(p2, "output");
+ Path p = new Path("output");
+ //Path p2 = new Path("data/result");
+ Path outFile = new Path("output");
SequenceFile.Reader reader;
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
- outFile, BytesWritable.class, KmerCountValue.class,
+ outFile, KmerBytesWritable.class, KmerCountValue.class,
CompressionType.NONE);
- BytesWritable key = new BytesWritable();
+ KmerBytesWritable key = new KmerBytesWritable(kmerSize);
KmerCountValue value = new KmerCountValue();
- File dir = new File("data/SinglePath_55");
+ File dir = new File("output");
for(File child : dir.listFiles()){
String name = child.getAbsolutePath();
Path inFile = new Path(p, name);
reader = new SequenceFile.Reader(fileSys, inFile, conf);
while (reader.next(key, value)) {
- System.out.println(Kmer.recoverKmerFrom(kmerSize, key.getBytes(), 0,
- key.getLength())
+ System.out.println(key.toString()
+ "\t" + value.toString());
writer.append(key, value);
}
@@ -53,8 +50,7 @@
reader = new SequenceFile.Reader(fileSys, outFile, conf);
while (reader.next(key, value)) {
- System.err.println(Kmer.recoverKmerFrom(kmerSize, key.getBytes(), 0,
- key.getLength())
+ System.err.println(key.toString()
+ "\t" + value.toString());
}
reader.close();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 18214a8..c759261 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -7,30 +7,28 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.old.Kmer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
public class GenerateTextFile {
public static void generate() throws IOException{
- BufferedWriter bw = new BufferedWriter(new FileWriter("text/new_SimplePath"));
+ BufferedWriter bw = new BufferedWriter(new FileWriter("text/log_TreePath"));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
for(int i = 0; i < 2; i++){
Path path = new Path("output/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- BytesWritable key = new BytesWritable();
+ KmerBytesWritable key = new KmerBytesWritable(5);
ValueStateWritable value = new ValueStateWritable();
while(reader.next(key, value)){
if (key == null || value == null){
break;
}
- bw.write(Kmer.recoverKmerFrom(5, key.getBytes(), 0,
- key.getLength())
+ bw.write(key.toString()
+ "\t" + value.toString());
bw.newLine();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 763d46e..f03a4ab 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -5,19 +5,18 @@
import java.io.IOException;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import edu.uci.ics.genomix.pregelix.LoadGraphVertex;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
-import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LoadGraphVertex;
+import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -30,10 +29,11 @@
private static void generateLoadGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(LoadGraphVertex.class);
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -45,15 +45,15 @@
private static void generateMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(MergeGraphVertex.class);
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
+ job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().setInt(MergeGraphVertex.KMER_SIZE, 55);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -63,15 +63,15 @@
private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
- job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().setInt(LogAlgorithmForMergeGraphVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -88,8 +88,6 @@
//genLoadGraph();
//genMergeGraph();
genLogAlgorithmForMergeGraph();
- //genSequenceLoadGraph();
- //genBasicBinaryLoadGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
index ffc1a25..45d8185 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/result/TreePath";// sequenceShortFileMergeTest
+ private static final String DATA_PATH = "data/sequencefile/TreePath";
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
index 4ea3c1d..8ac1b09 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/example/util/TestUtils.java
@@ -20,6 +20,13 @@
public class TestUtils {
+ public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
+ String[] fileNames = expectedFileDir.list();
+ for (String fileName : fileNames) {
+ compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
+ }
+ }
+
public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
@@ -28,7 +35,6 @@
try {
while ((lineExpected = readerExpected.readLine()) != null) {
lineActual = readerActual.readLine();
- // Assert.assertEquals(lineExpected, lineActual);
if (lineActual == null) {
throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
}
@@ -62,8 +68,10 @@
if (row1.equals(row2))
continue;
- String[] fields1 = row1.split(" ");
- String[] fields2 = row2.split(" ");
+ boolean spaceOrTab = false;
+ spaceOrTab = row1.contains(" ");
+ String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+ String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
for (int j = 0; j < fields1.length; j++) {
if (fields1[j].equals(fields2[j])) {
@@ -76,7 +84,7 @@
float float1 = (float) double1.doubleValue();
float float2 = (float) double2.doubleValue();
- if (Math.abs(float1 - float2) == 0)
+ if (Math.abs(float1 - float2) < 1.0e-7)
continue;
else {
return false;