Finish verifying the correctness of Naive Algorithm and Log Algorithm
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3373 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/data/result/.output.crc b/genomix/genomix-pregelix/data/result/.output.crc
new file mode 100644
index 0000000..c4995af
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/.output.crc
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/BridgePath b/genomix/genomix-pregelix/data/result/BridgePath
new file mode 100755
index 0000000..90f0a8a
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/BridgePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/CyclePath b/genomix/genomix-pregelix/data/result/CyclePath
new file mode 100755
index 0000000..0d50d01
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/CyclePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/LongPath b/genomix/genomix-pregelix/data/result/LongPath
new file mode 100755
index 0000000..b1040ab
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/LongPath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/Path b/genomix/genomix-pregelix/data/result/Path
new file mode 100755
index 0000000..76b1a0e
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/Path
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/SimplePath b/genomix/genomix-pregelix/data/result/SimplePath
new file mode 100755
index 0000000..dfabc43
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/SimplePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/SinglePath b/genomix/genomix-pregelix/data/result/SinglePath
new file mode 100755
index 0000000..6329aa6
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/SinglePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/ThreeKmer b/genomix/genomix-pregelix/data/result/ThreeKmer
new file mode 100755
index 0000000..f0435c7
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/ThreeKmer
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/TreePath b/genomix/genomix-pregelix/data/result/TreePath
new file mode 100755
index 0000000..dc8d16c
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/TreePath
Binary files differ
diff --git a/genomix/genomix-pregelix/data/result/TwoKmer b/genomix/genomix-pregelix/data/result/TwoKmer
new file mode 100755
index 0000000..73024db
--- /dev/null
+++ b/genomix/genomix-pregelix/data/result/TwoKmer
Binary files differ
diff --git a/genomix/genomix-pregelix/data/webmap/reduce-out b/genomix/genomix-pregelix/data/webmap/reduce-out
deleted file mode 100755
index 7266be5..0000000
--- a/genomix/genomix-pregelix/data/webmap/reduce-out
+++ /dev/null
Binary files differ
diff --git a/genomix/genomix-pregelix/data/webmap/sequenceShortFileMergeTest b/genomix/genomix-pregelix/data/webmap/sequenceShortFileMergeTest
index edd1e59..96f7fc9 100755
--- a/genomix/genomix-pregelix/data/webmap/sequenceShortFileMergeTest
+++ b/genomix/genomix-pregelix/data/webmap/sequenceShortFileMergeTest
Binary files differ
diff --git a/genomix/genomix-pregelix/data/webmap/test.dat b/genomix/genomix-pregelix/data/webmap/test.dat
deleted file mode 100644
index 266a4f4..0000000
--- a/genomix/genomix-pregelix/data/webmap/test.dat
+++ /dev/null
@@ -1,4 +0,0 @@
-
- "
- D
--
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/BinaryLoadGraphInputFormat.java
deleted file mode 100644
index 8ab6784..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/BinaryLoadGraphInputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.example.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
-
-public class BinaryLoadGraphInputFormat extends
- BinaryVertexInputFormat<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
- static int number = 0;
- /**
- * Format INPUT
- */
- @Override
- public VertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- try{
- System.out.println("split: " + number++ + " length:"+ split.getLength());
- }catch (Exception ex){
-
- }
- return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
- }
-}
-
-@SuppressWarnings("rawtypes")
-class BinaryLoadGraphReader extends
- BinaryVertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> {
- private Vertex vertex;
- private BytesWritable vertexId = new BytesWritable();
- private ByteWritable vertexValue = new ByteWritable();
-
- public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
- super(recordReader);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
- InterruptedException {
- if (vertex == null)
- vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
-
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
-
- vertex.reset();
- if(getRecordReader() != null){
- /**
- * set the src vertex id
- */
- vertexId.set(getRecordReader().getCurrentKey());
- vertex.setVertexId(vertexId);
- /**
- * set the vertex value
- */
- KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.set(kmerCountValue.getAdjBitMap());
- vertex.setVertexValue(vertexValue);
- }
-
- return vertex;
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/BinaryLoadGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/BinaryLoadGraphOutputFormat.java
deleted file mode 100644
index 8bf13b8..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/BinaryLoadGraphOutputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-
-public class BinaryLoadGraphOutputFormat extends
- BinaryVertexOutputFormat<BytesWritable, ByteWritable, NullWritable> {
-
- @Override
- public VertexWriter<BytesWritable, ByteWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<BytesWritable, ByteWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
- return new BinaryLoadGraphVertexWriter(recordWriter);
- }
-
- /**
- * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
- */
- public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<BytesWritable, ByteWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<BytesWritable, ByteWritable, NullWritable, ?> vertex) throws IOException,
- InterruptedException {
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
index 0c94512..c8aea7c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/GraphVertexOperation.java
@@ -1,7 +1,6 @@
package edu.uci.ics.genomix.pregelix;
import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -17,33 +16,49 @@
import edu.uci.ics.genomix.type.Kmer;
import edu.uci.ics.genomix.type.KmerUtil;
-import edu.uci.ics.genomix.pregelix.SequenceFile.GenerateSequenceFile;
import edu.uci.ics.genomix.pregelix.bitwise.BitwiseOperation;
-import edu.uci.ics.genomix.pregelix.example.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.example.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.example.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.hdfs.HDFSOperation;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.sequencefile.GenerateSequenceFile;
public class GraphVertexOperation {
- public static final int k = 3; //kmer, k: the length of kmer
+ public static final int k = 5; //kmer, k: the length of kmer
+ public static final int numBytes = (GraphVertexOperation.k-1)/4 + 1;
static private final Path TMP_DIR = new Path(
GenerateSequenceFile.class.getSimpleName() + "_INTERIM");
/**
* Single Vertex: in-degree = out-degree = 1
* @param vertexValue
*/
- public static boolean isPathVertex(ByteWritable vertexValue){
- byte value = vertexValue.get();
+ public static boolean isPathVertex(byte value){
if(KmerUtil.inDegree(value) == 1 && KmerUtil.outDegree(value) == 1)
return true;
return false;
}
/**
+ * Head Vertex: out-degree > 0,
+ * @param vertexValue
+ */
+ public static boolean isHeadVertex(byte value){
+ if(KmerUtil.outDegree(value) > 0 && !isPathVertex(value))
+ return true;
+ return false;
+ }
+ /**
+ * Rear Vertex: in-degree > 0,
+ * @param vertexValue
+ */
+ public static boolean isRearVertex(byte value){
+ if(KmerUtil.inDegree(value) > 0 && !isPathVertex(value))
+ return true;
+ return false;
+ }
+ /**
* Head Vertex: in-degree != 1, out-degree = 1,
* @param vertexValue
*/
- public static boolean isHead(ByteWritable vertexValue){
- byte value = vertexValue.get();
+ public static boolean isHead(byte value){
if(KmerUtil.inDegree(value) != 1 && KmerUtil.outDegree(value) == 1)
return true;
return false;
@@ -52,8 +67,7 @@
* Rear Vertex: in-degree = 1, out-degree != 1,
* @param vertexValue
*/
- public static boolean isRear(ByteWritable vertexValue){
- byte value = vertexValue.get();
+ public static boolean isRear(byte value){
if(KmerUtil.inDegree(value) == 1 && KmerUtil.outDegree(value) != 1)
return true;
return false;
@@ -257,13 +271,18 @@
* update right neighber
*/
public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue){
- return BitwiseOperation.replaceLastFourBits(oldVertexValue, newVertexValue);
+ return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
}
/**
* update right neighber based on next vertexId
*/
public static byte updateRightNeighberByVertexId(byte oldVertexValue, byte[] neighberVertexId){
- String oldVertex = BitwiseOperation.convertByteToBinaryString(oldVertexValue);
+
+ String neighberVertex = Kmer.recoverKmerFrom(GraphVertexOperation.k, neighberVertexId, 0, neighberVertexId.length);
+
+ byte newBit = Kmer.GENE_CODE.getAdjBit((byte)neighberVertex.charAt(neighberVertex.length() - 1));
+ return (byte) ((byte)(oldVertexValue & 0xF0) | (byte) (newBit & 0x0F));
+ /*String oldVertex = BitwiseOperation.convertByteToBinaryString(oldVertexValue);
String neighber = BitwiseOperation.convertBytesToBinaryStringKmer(neighberVertexId, k);
String lastTwoBits = neighber.substring(2*k-2,2*k);
if(lastTwoBits.compareTo("00") == 0)
@@ -275,7 +294,7 @@
else if(lastTwoBits.compareTo("11") == 0)
return BitwiseOperation.convertBinaryStringToByte(oldVertex.substring(0,4) + "1000");
- return (Byte) null;
+ return (Byte) null;*/
}
/**
* get precursor in vertexValue from gene code
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
index d928555..3b2ef52 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LoadGraphVertex.java
@@ -1,22 +1,17 @@
package edu.uci.ics.genomix.pregelix;
-import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
+import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
-import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.example.client.Client;
-import edu.uci.ics.genomix.pregelix.example.io.MessageWritable;
/*
* vertexId: BytesWritable
@@ -47,75 +42,29 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
-
- private ByteWritable tmpVertexValue = new ByteWritable();
-
+
/**
- * For test, in compute method, make each vertexValue shift 1 to left.
- * It will be modified when going forward to next step.
+ * For test, just output original file
*/
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
- if(getSuperstep() == 1){
- tmpVertexValue.set(getVertexValue().get());
- tmpVertexValue.set((byte) (tmpVertexValue.get() << 1));
- setVertexValue(tmpVertexValue);
- }
- else
- voteToHalt();
- }
-
- /**
- * Simple VertexWriter that supports {@link SimpleLoadGraphVertex}
- */
- public static class SimpleLoadGraphVertexWriter extends
- TextVertexWriter<BytesWritable, ByteWritable, NullWritable> {
- public SimpleLoadGraphVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
+ voteToHalt();
+ }
- @Override
- public void writeVertex(Vertex<BytesWritable, ByteWritable, NullWritable, ?> vertex) throws IOException,
- InterruptedException {
- getRecordWriter().write(new Text(vertex.getVertexId().toString()),
- new Text(vertex.getVertexValue().toString()));
- }
- }
-
- /**
- * Simple VertexOutputFormat that supports {@link SimpleLoadGraphVertex}
- */
- public static class SimpleLoadGraphVertexOutputFormat extends
- TextVertexOutputFormat<BytesWritable, ByteWritable, NullWritable> {
-
- @Override
- public VertexWriter<BytesWritable, ByteWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
- return new SimpleLoadGraphVertexWriter(recordWriter);
- }
- }
-
/**
* @param args
*/
public static void main(String[] args) throws Exception {
+ //final int k = Integer.parseInt(args[0]);
PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
job.setVertexClass(LoadGraphVertex.class);
/**
- * TextInput and TextOutput
- */
- job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
-
-
- /**
* BinaryInput and BinaryOutput
*/
- /* job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
+ job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);*/
+ job.setOutputValueClass(ByteWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
index c521074..d84020a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -1,29 +1,24 @@
package edu.uci.ics.genomix.pregelix;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.util.Iterator;
+import java.util.logging.FileHandler;
+import java.util.logging.Logger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.bitwise.BitwiseOperation;
-import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
-import edu.uci.ics.genomix.pregelix.example.client.Client;
-import edu.uci.ics.genomix.pregelix.example.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.example.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.log.LogAlgorithmLogFormatter;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.Kmer;
+import edu.uci.ics.genomix.type.KmerUtil;
/*
* vertexId: BytesWritable
@@ -54,283 +49,347 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LogAlgorithmForMergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+ public static Logger logger = Logger.getLogger(LogAlgorithmForMergeGraphVertex.class.getName());
+ LogAlgorithmLogFormatter formatter = new LogAlgorithmLogFormatter();
+ public static FileHandler handler;
- private byte[] tmpVertextId;
- private byte[] tmpSourceVertextId;
+ private byte[] tmpVertexId;
private byte[] tmpDestVertexId;
+ private BytesWritable destVertexId = new BytesWritable();
private byte[] mergeChainVertexId;
private int lengthOfMergeChainVertex;
- private byte[] tmpMergeChainVertexId;
- private int tmpLengthOfMergeChainVertex;
private byte tmpVertexValue;
- private int tmpVertexState;
- private int tmpMessage;
private ValueStateWritable tmpVal = new ValueStateWritable();
private LogAlgorithmMessageWritable tmpMsg = new LogAlgorithmMessageWritable();
- OutputStreamWriter writer;
/**
* Log Algorithm for path merge graph
*/
+ public LogAlgorithmForMergeGraphVertex(){
+ if(handler == null){
+ try {
+ handler = new FileHandler("log/" + LogAlgorithmForMergeGraphVertex.class.getName() + ".log");
+ } catch (Exception e) { e.printStackTrace();}
+ }
+ }
+
@Override
public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- try {
- writer = new OutputStreamWriter(new FileOutputStream("test/check_Log",true));
- } catch (FileNotFoundException e1) { e1.printStackTrace();}
- tmpVertextId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
+
+ tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
+ tmpVal = getVertexValue();
if (getSuperstep() == 1) {
- tmpVal = getVertexValue();
- tmpVertexValue = tmpVal.getValue();
tmpMsg.setChainVertexId(new byte[0]);
- if(GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))){
+ if(GraphVertexOperation.isHeadVertex(tmpVal.getValue())){
tmpMsg.setMessage(Message.START);
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpVertextId, tmpVertexValue);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- //test
- GraphVertexOperation.testLogMessageCommunication(writer, getSuperstep(),
- tmpVertextId, tmpDestVertexId, tmpMsg);
- //voteToHalt();
- }
- else if(GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
- tmpMsg.setMessage(Message.END);
- tmpDestVertexId = GraphVertexOperation.getLeftDestVertexId(tmpVertextId, tmpVertexValue);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- //test
- GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(),
- tmpDestVertexId, tmpMsg, tmpVal);
- //voteToHalt();
- }
- else if(GraphVertexOperation.isPathVertex(new ByteWritable(tmpVertexValue))){
- tmpVal = getVertexValue();
- tmpVal.setState(State.MID_VERTEX);
- setVertexValue(tmpVal);
- //test
- GraphVertexOperation.testSetVertexState(writer, getSuperstep(),tmpVertextId ,
- null, null, tmpVal);
- }
- voteToHalt();
- }
- else if(getSuperstep() == 2){
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- tmpMessage = tmpMsg.getMessage();
- tmpVal = getVertexValue();
- tmpVertexState = tmpVal.getState();
- if(tmpMessage == Message.START && tmpVertexState == State.MID_VERTEX){
- tmpVal.setState(State.START_VERTEX);
- setVertexValue(tmpVal);
- //test
- GraphVertexOperation.testSetVertexState(writer, getSuperstep(), tmpVertextId,
- null, null, tmpVal);
- }
- else if(tmpMessage == Message.END && tmpVertexState == State.MID_VERTEX){
- tmpVal.setState(State.END_VERTEX);
- setVertexValue(tmpVal);
- //test
- GraphVertexOperation.testSetVertexState(writer, getSuperstep(), tmpVertextId,
- null, null, tmpVal);
+ for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
+ if((tmpVal.getValue() & (1 << x)) != 0){
+ tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(GraphVertexOperation.k, tmpVertexId, x);
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ sendMsg(destVertexId,tmpMsg);
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### It is the head! #####");
+ }
}
voteToHalt();
}
+ if(GraphVertexOperation.isRearVertex(tmpVal.getValue())){
+ tmpMsg.setMessage(Message.END);
+
+ for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
+ if(((tmpVal.getValue()>> 4) & (1 << x)) != 0){
+ tmpDestVertexId = KmerUtil.shiftKmerWithPreCode(GraphVertexOperation.k, tmpVertexId, x);
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### It is the rear! #####");
+ }
+ }
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isPathVertex(tmpVal.getValue())){
+ tmpVal.setState(State.MID_VERTEX);
+ setVertexValue(tmpVal);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### It is the path! #####");
+ }
+ }
+ else if(getSuperstep() == 2){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(tmpVal.getValue())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ tmpMsg = msgIterator.next();
+
+ if(tmpMsg.getMessage() == Message.START && tmpVal.getState() == State.MID_VERTEX){
+ tmpVal.setState(State.START_VERTEX);
+ setVertexValue(tmpVal);
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### Set state! #####");
+ }
+ else if(tmpMsg.getMessage() == Message.END && tmpVal.getState() == State.MID_VERTEX){
+ tmpVal.setState(State.END_VERTEX);
+ setVertexValue(tmpVal);
+ voteToHalt();
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### Set state! #####");
+ }
+ else
+ voteToHalt();
+
+
+ }
+ }
}
//head node sends message to path node
else if(getSuperstep()%3 == 0){
- tmpVal = getVertexValue();
- tmpVertexValue = tmpVal.getValue();
- tmpVertexState = tmpVal.getState();
- tmpSourceVertextId = getVertexId().getBytes();
+ //tmpVal = getVertexValue();
if(getSuperstep() == 3){
tmpMsg = new LogAlgorithmMessageWritable();
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
- getVertexValue().getValue());
- if(tmpVertexState == State.START_VERTEX){
+ tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(GraphVertexOperation.k, tmpVertexId,
+ Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F)));
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ if(tmpVal.getState() == State.START_VERTEX){
tmpMsg.setMessage(Message.START);
- tmpMsg.setSourceVertexId(tmpSourceVertextId);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), tmpVertextId,
- tmpDestVertexId, tmpMsg, null);
+ tmpMsg.setSourceVertexId(getVertexId().getBytes());
+ sendMsg(destVertexId, tmpMsg);
+ voteToHalt();
}
- else if(tmpVertexState != State.END_VERTEX){
+ else if(tmpVal.getState() != State.END_VERTEX){
tmpMsg.setMessage(Message.NON);
- tmpMsg.setSourceVertexId(tmpSourceVertextId);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), tmpVertextId,
- tmpDestVertexId, tmpMsg, null);
+ tmpMsg.setSourceVertexId(getVertexId().getBytes());
+ sendMsg(destVertexId,tmpMsg);
+ voteToHalt();
}
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### head node sends message to path node! #####");
+
}
else{
if(msgIterator.hasNext()){
tmpMsg = msgIterator.next();
- tmpLengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain();
- tmpMergeChainVertexId = tmpVal.getMergeChain();
- byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMergeChainVertexId,
- tmpLengthOfMergeChainVertex);
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer,
- tmpMsg.getNeighberInfo());
- if(tmpVertexState == State.START_VERTEX){
+ byte[] lastKmer = KmerUtil.getLastKmerFromChain(GraphVertexOperation.k,
+ tmpVal.getLengthOfMergeChain(),
+ tmpVal.getMergeChain());
+ tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(GraphVertexOperation.k, lastKmer,
+ Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpVal.getValue() & 0x0F))); //tmpMsg.getNeighberInfo()
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ if(tmpVal.getState() == State.START_VERTEX){
tmpMsg.setMessage(Message.START);
- tmpMsg.setSourceVertexId(tmpSourceVertextId);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), tmpVertextId,
- tmpDestVertexId, tmpMsg, null);
+ tmpMsg.setSourceVertexId(getVertexId().getBytes());
+ sendMsg(destVertexId, tmpMsg);
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### head node sends message to path node! #####");
+ voteToHalt();
}
- else if(tmpVertexState != State.END_VERTEX){
+ else if(tmpVal.getState() != State.END_VERTEX){
tmpMsg.setMessage(Message.NON);
- tmpMsg.setSourceVertexId(tmpSourceVertextId);
- sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), tmpVertextId,
- tmpDestVertexId, tmpMsg, null);
+ tmpMsg.setSourceVertexId(getVertexId().getBytes());
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### head node sends message to path node! #####");
}
+
}
- else
- voteToHalt();
}
- voteToHalt();
}
+
//path node sends message back to head node
else if(getSuperstep()%3 == 1){
if(msgIterator.hasNext()){
- tmpVal = getVertexValue();
tmpMsg = msgIterator.next();
- tmpMessage = tmpMsg.getMessage();
- tmpSourceVertextId = tmpMsg.getSourceVertexId();
+ int message = tmpMsg.getMessage();
if(tmpVal.getLengthOfMergeChain() == 0){
tmpVal.setLengthOfMergeChain(GraphVertexOperation.k);
- tmpVal.setMergeChain(tmpVertextId);
+ tmpVal.setMergeChain(tmpVertexId);
setVertexValue(tmpVal);
}
+
tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
tmpMsg.setChainVertexId(tmpVal.getMergeChain());
tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
tmpMsg.setSourceVertexState(tmpVal.getState());
- if(tmpVal.getState() == State.END_VERTEX)
+
+ //kill Message because it has been merged by the head
+ if(tmpVal.getState() == State.END_VERTEX){
tmpMsg.setMessage(Message.END);
+ //tmpVal.setState(State.FINAL_DELETE);
+ //setVertexValue(tmpVal);
+ //deleteVertex(getVertexId());
+ }
else
tmpMsg.setMessage(Message.NON);
-
- sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), tmpVertextId,
- tmpSourceVertextId, tmpMsg, tmpSourceVertextId);
- //kill Message because it has been merged by the head
- if(tmpMessage == Message.START){
+
+ if(message == Message.START){
tmpVal.setState(State.TODELETE);
setVertexValue(tmpVal);
}
+ destVertexId.set(tmpMsg.getSourceVertexId(), 0, tmpMsg.getSourceVertexId().length);
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpMsg.getSourceVertexId(), tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### path node sends message back to head node! #####");
+
}
else{
+ //String source2 = Kmer.recoverKmerFrom(5, tmpVertexId, 0, tmpVertexId.length);
if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX
- && tmpMessage != Message.END && tmpMessage != Message.START){
+ && getVertexValue().getState() != State.END_VERTEX){
- GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), tmpVertextId, "not receive any message");
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, null, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ formatter.setOperation(1);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### Delete! Not receive message! #####");
deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
}
}
}
else if(getSuperstep()%3 == 2){
- if(msgIterator.hasNext()){
- tmpMsg = msgIterator.next();
- tmpVal = getVertexValue();
- tmpVertexState = tmpVal.getState();
- tmpSourceVertextId = tmpVertextId;
- if(tmpVertexState == State.TODELETE){
- GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(),
- tmpSourceVertextId, "already merged by head");
- deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
- }
- else{
+ if(tmpVal.getState() == State.TODELETE){
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### Delete! Already merge by head! #####");
+ formatter.setOperation(0);
+
+ deleteVertex(getVertexId()); //killSelf
+ }
+ else{
+ if(msgIterator.hasNext()){
+ tmpMsg = msgIterator.next();
+
if(tmpMsg.getMessage() == Message.END){
- if(tmpVertexState != State.START_VERTEX)
- tmpVertexState = State.END_VERTEX;
+ if(tmpVal.getState() != State.START_VERTEX)
+ tmpVal.setState(State.END_VERTEX);
else
- tmpVertexState = State.FINAL_VERTEX;
+ tmpVal.setState(State.FINAL_VERTEX);
}
- tmpVal.setState(tmpVertexState);
if(getSuperstep() == 5){
lengthOfMergeChainVertex = GraphVertexOperation.k;
- mergeChainVertexId = tmpVertextId;
+ mergeChainVertexId = tmpVertexId;
}
else{
lengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain();
mergeChainVertexId = tmpVal.getMergeChain();
}
- mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId,
- lengthOfMergeChainVertex, tmpMsg.getChainVertexId(), tmpMsg.getLengthOfChain()); //tmpMsg.getSourceVertexId()
+ mergeChainVertexId = KmerUtil.mergeTwoKmer(lengthOfMergeChainVertex,
+ mergeChainVertexId,
+ tmpMsg.getLengthOfChain() - GraphVertexOperation.k + 1,
+ KmerUtil.getLastKmerFromChain(tmpMsg.getLengthOfChain() - GraphVertexOperation.k + 1,
+ tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId()));
lengthOfMergeChainVertex = lengthOfMergeChainVertex + tmpMsg.getLengthOfChain()
- GraphVertexOperation.k + 1;
tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
tmpVal.setMergeChain(mergeChainVertexId);
- //test
- GraphVertexOperation.testMergeChainVertex(writer, getSuperstep(),
- mergeChainVertexId, lengthOfMergeChainVertex);
- byte tmpByte = tmpMsg.getNeighberInfo();
- tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
+ //log
+ formatter.setMergeChain(getSuperstep(), tmpVertexId, lengthOfMergeChainVertex, mergeChainVertexId, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### Merge Chain INFO #####");
+
+ tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpMsg.getNeighberInfo());
tmpVal.setValue(tmpVertexValue);
- if(tmpVertexState != State.FINAL_VERTEX){
+ if(tmpMsg.getMessage() != Message.END){
setVertexValue(tmpVal);
- tmpMsg = new LogAlgorithmMessageWritable();
+ tmpMsg = new LogAlgorithmMessageWritable(); //reset
tmpMsg.setNeighberInfo(tmpVertexValue);
sendMsg(getVertexId(),tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
- tmpVertextId, tmpMsg, null);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, tmpVal.getState(), GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### head node sends message to path node! #####");
}
}
- if(tmpVertexState == State.END_VERTEX){
+ if(tmpVal.getState() == State.END_VERTEX){
voteToHalt();
- //test
- GraphVertexOperation.testVoteVertexInfo(writer, getSuperstep(), tmpVertextId,
- " it is the rear!");
+
+ //log
+ formatter.setVotoToHalt(getSuperstep(), tmpVertexId, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### Because it's rear! #####");
+ formatter.setOperation(0);
}
- if(tmpVertexState == State.FINAL_VERTEX){
+ if(tmpVal.getState() == State.FINAL_VERTEX){
voteToHalt();
- try {
+ /*try {
GraphVertexOperation.flushChainToFile(tmpVal.getMergeChain(),
- tmpVal.getLengthOfMergeChain(),tmpVertextId);
- writer.write("Step: " + getSuperstep() + "\r\n");
- writer.write("Flush! " + "\r\n");
- } catch (IOException e) { e.printStackTrace(); }
+ tmpVal.getLengthOfMergeChain(),tmpVertexId);
+ } catch (IOException e) { e.printStackTrace(); }*/
}
}
+
}
- try {
- writer.close();
- } catch (IOException e) { e.printStackTrace(); }
}
-
- private void signalTerminate() {
- Configuration conf = getContext().getConfiguration();
- try {
- IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
- writeMergeGraphResult(conf, true);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- private void writeMergeGraphResult(Configuration conf, boolean terminate) {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "MergeGraph";
- Path path = new Path(pathStr);
- if (!dfs.exists(path)) {
- FSDataOutputStream output = dfs.create(path, true);
- output.writeBoolean(terminate);
- output.flush();
- output.close();
- }
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
/**
* @param args
@@ -344,7 +403,7 @@
job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class);
job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
index 0a77a94..a5c40ad 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/MergeGraphVertex.java
@@ -1,16 +1,10 @@
package edu.uci.ics.genomix.pregelix;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.util.Iterator;
+import java.util.logging.FileHandler;
+import java.util.logging.Logger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
@@ -18,12 +12,13 @@
import edu.uci.ics.genomix.type.KmerUtil;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.bitwise.BitwiseOperation;
-import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
-import edu.uci.ics.genomix.pregelix.example.client.Client;
-import edu.uci.ics.genomix.pregelix.example.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.hdfs.HDFSOperation;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
+import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.log.NaiveAlgorithmLogFormatter;
+import edu.uci.ics.genomix.pregelix.type.State;
/*
* vertexId: BytesWritable
@@ -53,140 +48,169 @@
* The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
-public class MergeGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+public class MergeGraphVertex extends Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
- /** The number of bytes for vertex id */
- public static final int numBytes = (GraphVertexOperation.k-1)/4 + 1;
- private BytesWritable tmpVertextId = new BytesWritable();
- private BytesWritable tmpDestVertexId = new BytesWritable();
+ public static Logger logger = Logger.getLogger(MergeGraphVertex.class.getName());
+ NaiveAlgorithmLogFormatter formatter = new NaiveAlgorithmLogFormatter();
+ public static FileHandler handler;
+
+ private byte[] tmpVertexId;
+ private byte[] tmpDestVertexId;
+ private BytesWritable destVertexId = new BytesWritable();
private BytesWritable tmpChainVertexId = new BytesWritable();
- private ByteWritable tmpVertexValue = new ByteWritable();
+ private ValueStateWritable tmpVertexValue = new ValueStateWritable();
private MessageWritable tmpMsg = new MessageWritable();
- OutputStreamWriter writer;
/**
* Naive Algorithm for path merge graph
+ * @throws Exception
+ * @throws
*/
+ public MergeGraphVertex(){
+ if(handler == null){
+ try {
+ handler = new FileHandler("log/" + MergeGraphVertex.class.getName() + ".log");
+ } catch (Exception e) { e.printStackTrace();}
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
- try {
- writer = new OutputStreamWriter(new FileOutputStream("test/check_Naive",true));
- } catch (FileNotFoundException e1) { e1.printStackTrace();}
- tmpVertextId.set(GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId()),0,numBytes);
+
+ tmpVertexId = GraphVertexOperation.generateValidDataFromBytesWritable(getVertexId());
if (getSuperstep() == 1) {
- if(GraphVertexOperation.isHead(getVertexValue())){
- tmpDestVertexId.set(tmpVertextId);
- //change
- Kmer.moveKmer(GraphVertexOperation.k, tmpDestVertexId.getBytes(),
- (byte)GraphVertexOperation.findSucceedNode(getVertexValue().get()));
- //tmpDestVertexId.set(GraphVertexOperation.getDestVertexId(tmpDestVertexId.getBytes(),
- // getVertexValue().get()), 0, numBytes);
- tmpMsg.setSourceVertexId(tmpVertextId.getBytes());
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getValue())){
+ tmpMsg.setSourceVertexId(tmpVertexId);
+ tmpMsg.setHead(tmpVertexId);
tmpMsg.setLengthOfChain(0);
tmpMsg.setChainVertexId(tmpChainVertexId.getBytes());
- sendMsg(tmpDestVertexId,tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication(writer,getSuperstep(),tmpVertextId.getBytes(),
- tmpDestVertexId.getBytes(),tmpMsg);
+ for(byte x = Kmer.GENE_CODE.A; x<= Kmer.GENE_CODE.T ; x++){
+ if((getVertexValue().getValue() & (1 << x)) != 0){
+ tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(GraphVertexOperation.k, tmpVertexId, x);
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### It is the head! #####");
+ }
}
+ }
}
+
//path node sends message back to head node
else if(getSuperstep()%2 == 0){
+
if(msgIterator.hasNext()){
tmpMsg = msgIterator.next();
+
if(!tmpMsg.isRear()){
- if(GraphVertexOperation.isPathVertex(getVertexValue())){
- tmpDestVertexId.set(tmpMsg.getSourceVertexId(), 0, numBytes);
- tmpMsg.setNeighberInfo(getVertexValue().get()); //set neighber
+ if(getSuperstep() == 2)
+ tmpMsg.setHead(tmpVertexId);
+ if(GraphVertexOperation.isPathVertex(getVertexValue().getValue())){
+ tmpDestVertexId = tmpMsg.getSourceVertexId();
+ tmpMsg.setNeighberInfo(getVertexValue().getValue()); //set neighber
if(tmpMsg.getLengthOfChain() == 0){
tmpMsg.setLengthOfChain(GraphVertexOperation.k);
- tmpMsg.setChainVertexId(tmpVertextId.getBytes());
+ tmpMsg.setChainVertexId(tmpVertexId);
}
else{
- /*
- tmpMsg.incrementLength();
- tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(
- tmpChainVertexId,
- tmpMsg.getLengthOfChain()-1,
- tmpVertextId));
- */
- //change
+ String source = Kmer.recoverKmerFrom(GraphVertexOperation.k, tmpVertexId, 0, tmpVertexId.length);
tmpMsg.setChainVertexId(KmerUtil.mergeKmerWithNextCode(
tmpMsg.getLengthOfChain(),
tmpMsg.getChainVertexId(),
- (byte)GraphVertexOperation.findSucceedNode(getVertexValue().get())));
+ Kmer.GENE_CODE.getCodeFromSymbol((byte)source.charAt(source.length() - 1))));
tmpMsg.incrementLength();
deleteVertex(getVertexId());
}
- sendMsg(tmpDestVertexId,tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication(writer,getSuperstep(),tmpVertextId.getBytes(),
- tmpDestVertexId.getBytes(),tmpMsg);
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### It is the path! #####");
}
- else if(GraphVertexOperation.isRear(getVertexValue())){
- tmpDestVertexId.set(tmpMsg.getSourceVertexId(), 0, numBytes);
- tmpMsg.setSourceVertexId(tmpVertextId.getBytes());
- tmpMsg.setRear(true);
- sendMsg(tmpDestVertexId,tmpMsg);
- //test
- try {
- writer.write("It is Rear!\r\n");
- } catch (IOException e) { e.printStackTrace(); }
- GraphVertexOperation.testMessageCommunication(writer,getSuperstep(),tmpVertextId.getBytes(),
- tmpDestVertexId.getBytes(),tmpMsg);
+ else if(GraphVertexOperation.isRearVertex(getVertexValue().getValue())){
+ if(getSuperstep() == 2)
+ voteToHalt();
+ else{
+ tmpDestVertexId = tmpMsg.getSourceVertexId();
+ tmpMsg.setSourceVertexId(tmpVertexId);
+ tmpMsg.setRear(true);
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### It is the rear! #####!");
+ }
}
}
else{
- tmpVertexValue.set(GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().get(),
+ tmpVertexValue.setState(State.START_VERTEX);
+ tmpVertexValue.setValue(GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getValue(),
tmpMsg.getSourceVertexId()));
+ tmpVertexValue.setLengthOfMergeChain(tmpMsg.getLengthOfChain());
+ tmpVertexValue.setMergeChain(tmpMsg.getChainVertexId());
+ setVertexValue(tmpVertexValue);
try {
+ String source = Kmer.recoverKmerFrom(tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId(), 0, tmpMsg.getChainVertexId().length);
GraphVertexOperation.flushChainToFile(tmpMsg.getChainVertexId(),
- tmpMsg.getLengthOfChain(),tmpVertextId.getBytes());
+ tmpMsg.getLengthOfChain(),tmpVertexId);
} catch (IOException e) { e.printStackTrace(); }
- //test
- GraphVertexOperation.testLastMessageCommunication(writer,getSuperstep(),tmpVertextId.getBytes(),
- tmpDestVertexId.getBytes(),tmpMsg);
}
}
}
//head node sends message to path node
else if(getSuperstep()%2 == 1){
- if (msgIterator.hasNext()){
+ while (msgIterator.hasNext()){
tmpMsg = msgIterator.next();
if(!tmpMsg.isRear()){
byte[] lastKmer = KmerUtil.getLastKmerFromChain(GraphVertexOperation.k,
tmpMsg.getLengthOfChain(),
tmpMsg.getChainVertexId());
- //byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMsg.getChainVertexId(),
- // tmpMsg.getLengthOfChain());
- tmpDestVertexId.set(lastKmer, 0, numBytes);
- //change
- Kmer.moveKmer(GraphVertexOperation.k, tmpDestVertexId.getBytes(),
- (byte)GraphVertexOperation.findSucceedNode(getVertexValue().get()));
- //tmpDestVertexId.set(GraphVertexOperation.getDestVertexId(lastKmer,
- // tmpMsg.getNeighberInfo()), 0, numBytes);
- tmpMsg.setSourceVertexId(tmpVertextId.getBytes());
- sendMsg(tmpDestVertexId,tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication(writer,getSuperstep(),tmpVertextId.getBytes(),
- tmpDestVertexId.getBytes(),tmpMsg);
+ tmpDestVertexId = KmerUtil.shiftKmerWithNextCode(GraphVertexOperation.k, lastKmer,
+ Kmer.GENE_CODE.getGeneCodeFromBitMap((byte)(tmpMsg.getNeighberInfo() & 0x0F)));
+
+ tmpMsg.setSourceVertexId(tmpVertexId);
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("");
}
else{
- tmpDestVertexId.set(tmpVertextId);
- //change
- Kmer.moveKmer(GraphVertexOperation.k, tmpDestVertexId.getBytes(),
- (byte)GraphVertexOperation.findSucceedNode(getVertexValue().get()));
- //tmpDestVertexId.set(GraphVertexOperation.getDestVertexId(tmpVertextId.getBytes(),
- // getVertexValue().get()), 0, numBytes);
- sendMsg(tmpDestVertexId,tmpMsg);
- //test
- GraphVertexOperation.testMessageCommunication(writer,getSuperstep(),tmpVertextId.getBytes(),
- tmpDestVertexId.getBytes(),tmpMsg);
+ tmpDestVertexId = tmpMsg.getHead();
+ destVertexId.set(tmpDestVertexId, 0, tmpDestVertexId.length);
+ sendMsg(destVertexId,tmpMsg);
+
+ //log
+ formatter.set(getSuperstep(), tmpVertexId, tmpDestVertexId, tmpMsg, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("##### Rear is sent back! #####");
}
}
}
- try {
- writer.close();
- } catch (IOException e) { e.printStackTrace(); }
+
voteToHalt();
}
@@ -202,7 +226,7 @@
job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/TestLoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/TestLoadGraphVertex.java
deleted file mode 100644
index 3449df6..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/TestLoadGraphVertex.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.bitwise.BitwiseOperation;
-import edu.uci.ics.genomix.pregelix.example.client.Client;
-import edu.uci.ics.genomix.pregelix.example.io.MessageWritable;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class TestLoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
-
- /**
- * For test, just output original file
- */
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- voteToHalt();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- //final int k = Integer.parseInt(args[0]);
- PregelixJob job = new PregelixJob(TestLoadGraphVertex.class.getSimpleName());
- job.setVertexClass(TestLoadGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/TextLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/TextLoadGraphInputFormat.java
deleted file mode 100644
index e3c660e..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/TextLoadGraphInputFormat.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.example.io.MessageWritable;
-
-public class TextLoadGraphInputFormat extends
- TextVertexInputFormat<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
-
- /**
- * Format INPUT
- */
- @Override
- public VertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new TextLoadGraphReader(textInputFormat.createRecordReader(split, context));
- }
-
- @SuppressWarnings("rawtypes")
- class TextLoadGraphReader extends
- TextVertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> {
- private final static String separator = " ";
- private Vertex vertex;
- private BytesWritable vertexId = new BytesWritable();
- private ByteWritable vertexValue = new ByteWritable();
-
- public TextLoadGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
- super(lineRecordReader);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
- InterruptedException {
- if (vertex == null)
- vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
-
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
- Text line = getRecordReader().getCurrentValue();
- String[] fields = line.toString().split(separator);
-
- if (fields.length > 0) {
- /**
- * set the src vertex id
- */
- BytesWritable src = new BytesWritable(fields[0].getBytes());
- vertexId.set(src);
- vertex.setVertexId(vertexId);
-
-
- /**
- * set the vertex value
- */
- byte[] temp = fields[1].getBytes();
- vertexValue.set(temp[0]);
- vertex.setVertexValue(vertexValue);
-
- }
- return vertex;
- }
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
index 51eceee..823a984 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -16,7 +16,7 @@
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.genomix.type.KmerCountValue;
-public class BinaryVertexInputFormat <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+public class BinaryVertexInputFormat <I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M>{
/** Uses the SequenceFileInputFormat to do everything */
@@ -34,7 +34,7 @@
* @param <E>
* Edge value
*/
- public static abstract class BinaryVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
private final RecordReader<BytesWritable,KmerCountValue> lineRecordReader;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
index 2bfaf90..f497f21 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -2,7 +2,6 @@
import java.io.IOException;
-import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -12,6 +11,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<BytesWritable, ByteWritable> lineRecordWriter;
+ private final RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<BytesWritable, ByteWritable> getRecordWriter() {
+ public RecordWriter<BytesWritable, ValueStateWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/client/Client.java
deleted file mode 100644
index 5507713..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/client/Client.java
+++ /dev/null
@@ -1,58 +0,0 @@
-
-package edu.uci.ics.genomix.pregelix.example.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.core.base.IDriver.Plan;
-import edu.uci.ics.pregelix.core.driver.Driver;
-
-public class Client {
-
- private static class Options {
- @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
- public String inputPaths;
-
- @Option(name = "-outputpath", usage = "output path", required = true)
- public String outputPath;
-
- @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
- public String ipAddress;
-
- @Option(name = "-port", usage = "port of cluster controller", required = false)
- public int port;
-
- @Option(name = "-plan", usage = "query plan choice", required = false)
- public Plan planChoice = Plan.OUTER_JOIN;
-
- @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
- public String profiling = "false";
- }
-
- public static void run(String[] args, PregelixJob job) throws Exception {
- Options options = prepareJob(args, job);
- Driver driver = new Driver(Client.class);
- driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
- }
-
- private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
-
- String[] inputs = options.inputPaths.split(";");
- FileInputFormat.setInputPaths(job, inputs[0]);
- for (int i = 1; i < inputs.length; i++)
- FileInputFormat.addInputPaths(job, inputs[0]);
- FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- return options;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
new file mode 100644
index 0000000..09d1b6e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphInputFormat.java
@@ -0,0 +1,96 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+import java.util.logging.FileHandler;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueWritable;
+import edu.uci.ics.genomix.pregelix.log.DataLoadLogFormatter;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
+
+public class BinaryLoadGraphInputFormat extends
+ BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, MessageWritable>{
+ /**
+ * Format INPUT
+ */
+ @Override
+ public VertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class BinaryLoadGraphReader extends
+ BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ public static Logger logger = Logger.getLogger(BinaryLoadGraphReader.class.getName());
+ DataLoadLogFormatter formatter = new DataLoadLogFormatter();
+ FileHandler handler;
+ private Vertex vertex;
+ private BytesWritable vertexId = new BytesWritable();
+ private ValueStateWritable vertexValue = new ValueStateWritable();
+
+ public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
+ super(recordReader);
+ try {
+ handler = new FileHandler("log/" + BinaryLoadGraphReader.class.getName() + ".log");
+ } catch (SecurityException e1) { e1.printStackTrace();}
+ catch (IOException e1) { e1.printStackTrace();}
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<BytesWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ if(getRecordReader() != null){
+ /**
+ * set the src vertex id
+ */
+ vertexId.set(getRecordReader().getCurrentKey());
+ vertex.setVertexId(vertexId);
+ /**
+ * set the vertex value
+ */
+ KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
+ vertexValue.setValue(kmerCountValue.getAdjBitMap());
+ vertex.setVertexValue(vertexValue);
+
+ //log
+ formatter.set(vertexId, kmerCountValue, GraphVertexOperation.k);
+ if(logger.getHandlers() != null)
+ logger.removeHandler(handler);
+ handler.setFormatter(formatter);
+ logger.addHandler(handler);
+ logger.info("");
+ }
+
+ return vertex;
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
similarity index 74%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
index dbaa528..f02d426 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/BinaryLoadGraphOutputFormat.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix;
+package edu.uci.ics.genomix.pregelix.format;
import java.io.IOException;
@@ -9,17 +9,18 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.genomix.pregelix.example.io.ValueStateWritable;
-public class LogAlgorithmForMergeGraphOutputFormat extends
+public class BinaryLoadGraphOutputFormat extends
BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
@Override
public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- RecordWriter<BytesWritable, ByteWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -28,15 +29,14 @@
*/
public static class BinaryLoadGraphVertexWriter extends
BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+ public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
- getRecordWriter().write(vertex.getVertexId(),
- new ByteWritable(vertex.getVertexValue().getValue()));
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
similarity index 88%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphInputFormat.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
index 625d588..3fd3345 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphInputFormat.java
@@ -1,6 +1,7 @@
-package edu.uci.ics.genomix.pregelix;
+package edu.uci.ics.genomix.pregelix.format;
import java.io.IOException;
+import java.util.logging.FileHandler;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
@@ -8,15 +9,16 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.genomix.pregelix.bitwise.BitwiseOperation;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerCountValue;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.bitwise.BitwiseOperation;
-import edu.uci.ics.genomix.pregelix.example.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.example.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.type.KmerCountValue;
public class LogAlgorithmForMergeGraphInputFormat extends
BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
@@ -33,7 +35,6 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
- private final static String separator = " ";
private Vertex vertex;
private BytesWritable vertexId = new BytesWritable();
private ValueStateWritable vertexValue = new ValueStateWritable();
@@ -73,11 +74,11 @@
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
- String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId.getBytes(),GraphVertexOperation.k);
+ /*String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId.getBytes(),GraphVertexOperation.k);
System.out.println("key: " + kmer);
System.out.println("code: " + GraphVertexOperation.convertBinaryStringToGenecode(kmer));
System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(kmerCountValue.getAdjBitMap()));
- System.out.println();
+ System.out.println();*/
}
return vertex;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
similarity index 73%
copy from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
copy to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
index dbaa528..aa81066 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForMergeGraphOutputFormat.java
@@ -1,8 +1,7 @@
-package edu.uci.ics.genomix.pregelix;
+package edu.uci.ics.genomix.pregelix.format;
import java.io.IOException;
-import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -11,15 +10,17 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.genomix.pregelix.example.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
public class LogAlgorithmForMergeGraphOutputFormat extends
BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
+
@Override
public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- RecordWriter<BytesWritable, ByteWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<BytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -28,15 +29,16 @@
*/
public static class BinaryLoadGraphVertexWriter extends
BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+
+ public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
- getRecordWriter().write(vertex.getVertexId(),
- new ByteWritable(vertex.getVertexValue().getValue()));
+ if(vertex.getVertexValue().getState() != State.FINAL_DELETE)
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
similarity index 95%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/LogAlgorithmMessageWritable.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index d77a26f..4be98bc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.example.io;
+package edu.uci.ics.genomix.pregelix.io;
import java.io.DataInput;
import java.io.DataOutput;
@@ -37,8 +37,10 @@
}
public void reset(){
- sourceVertexId = null;
+ sourceVertexId = new byte[(GraphVertexOperation.k-1)/4 + 1];
neighberInfo = (Byte) null;
+ lengthOfChain = 0;
+ chainVertexId = null;
message = 0;
sourceVertexState = 0;
}
@@ -109,10 +111,12 @@
out.writeInt(lengthOfChain);
if(lengthOfChain != 0)
out.write(chainVertexId);
- out.write(sourceVertexId);
- out.write(neighberInfo);
+
out.writeInt(message);
out.writeInt(sourceVertexState);
+
+ out.write(sourceVertexId);
+ out.write(neighberInfo);
}
@Override
@@ -125,11 +129,13 @@
}
else
chainVertexId = new byte[0];
+
+ message = in.readInt();
+ sourceVertexState = in.readInt();
+
sourceVertexId = new byte[(GraphVertexOperation.k-1)/4 + 1];
in.readFully(sourceVertexId);
neighberInfo = in.readByte();
- message = in.readInt();
- sourceVertexState = in.readInt();
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
similarity index 91%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/MessageWritable.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index b477eb5..3146088 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.example.io;
+package edu.uci.ics.genomix.pregelix.io;
import java.io.DataInput;
import java.io.DataOutput;
@@ -22,17 +22,19 @@
private File file;
private boolean isRear;
private int lengthOfChain;
+ private byte[] head;
public MessageWritable(){
}
- public void set(byte[] sourceVertexId, byte neighberInfo, byte[] chainVertexId, File file){
+ public void set(byte[] sourceVertexId, byte neighberInfo, byte[] chainVertexId, File file, byte[] head){
this.sourceVertexId = sourceVertexId;
this.neighberInfo = neighberInfo;
this.chainVertexId = chainVertexId;
this.file = file;
this.isRear = false;
this.lengthOfChain = 0;
+ this.head = head;
}
public byte[] getSourceVertexId() {
@@ -82,6 +84,15 @@
public void setLengthOfChain(int lengthOfChain) {
this.lengthOfChain = lengthOfChain;
}
+
+
+ public byte[] getHead() {
+ return head;
+ }
+
+ public void setHead(byte[] head) {
+ this.head = head;
+ }
public void incrementLength(){
this.lengthOfChain++;
@@ -94,6 +105,7 @@
if(lengthOfChain != 0)
out.write(chainVertexId);
out.write(sourceVertexId);
+ out.write(head);
out.write(neighberInfo);
out.writeBoolean(isRear);
}
@@ -110,8 +122,11 @@
chainVertexId = new byte[0];
sourceVertexId = new byte[(GraphVertexOperation.k-1)/4 + 1];
in.readFully(sourceVertexId);
+ head = new byte[(GraphVertexOperation.k-1)/4 + 1];
+ in.readFully(head);
neighberInfo = in.readByte();
isRear = in.readBoolean();
+
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
similarity index 97%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/ValueStateWritable.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 97275be..9acda4e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.example.io;
+package edu.uci.ics.genomix.pregelix.io;
import java.io.*;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
similarity index 68%
copy from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/ValueStateWritable.java
copy to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
index 97275be..a3f0b9f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/example/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueWritable.java
@@ -1,27 +1,21 @@
-package edu.uci.ics.genomix.pregelix.example.io;
+package edu.uci.ics.genomix.pregelix.io;
import java.io.*;
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.type.State;
-
-
-public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
+public class ValueWritable implements WritableComparable<ValueWritable> {
private byte value;
- private int state;
private int lengthOfMergeChain;
private byte[] mergeChain;
- public ValueStateWritable() {
- state = State.NON_VERTEX;
+ public ValueWritable() {
lengthOfMergeChain = 0;
}
- public ValueStateWritable(byte value, int state, int lengthOfMergeChain, byte[] mergeChain) {
+ public ValueWritable(byte value, int lengthOfMergeChain, byte[] mergeChain) {
this.value = value;
- this.state = state;
this.lengthOfMergeChain = lengthOfMergeChain;
this.mergeChain = mergeChain;
}
@@ -34,14 +28,6 @@
this.value = value;
}
- public int getState() {
- return state;
- }
-
- public void setState(int state) {
- this.state = state;
- }
-
public int getLengthOfMergeChain() {
return lengthOfMergeChain;
}
@@ -61,7 +47,6 @@
@Override
public void readFields(DataInput in) throws IOException {
value = in.readByte();
- state = in.readInt();
lengthOfMergeChain = in.readInt();
if(lengthOfMergeChain != 0){
mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
@@ -74,14 +59,13 @@
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(value);
- out.writeInt(state);
out.writeInt(lengthOfMergeChain);
if(lengthOfMergeChain != 0)
out.write(mergeChain);
}
@Override
- public int compareTo(ValueStateWritable o) {
+ public int compareTo(ValueWritable o) {
// TODO Auto-generated method stub
return 0;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
new file mode 100644
index 0000000..f5ddfc6
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.genomix.pregelix.log;
+
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.genomix.type.Kmer;
+import edu.uci.ics.genomix.type.KmerCountValue;
+
+public class DataLoadLogFormatter extends Formatter{
+ private BytesWritable key;
+ private KmerCountValue value;
+ private int k;
+
+ public void set(BytesWritable key,
+ KmerCountValue value, int k){
+ this.key = key;
+ this.value = value;
+ this.k = k;
+ }
+ public String format(LogRecord record) {
+ StringBuilder builder = new StringBuilder(1000);
+
+ builder.append(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
+ key.getLength())
+ + "\t" + value.toString() + "\r\n");
+
+ if(!formatMessage(record).equals(""))
+ builder.append(formatMessage(record) + "\r\n");
+ return builder.toString();
+ }
+
+ public String getHead(Handler h) {
+ return super.getHead(h);
+ }
+
+ public String getTail(Handler h) {
+ return super.getTail(h);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
new file mode 100644
index 0000000..2e4fd68
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -0,0 +1,119 @@
+package edu.uci.ics.genomix.pregelix.log;
+
+import java.util.logging.*;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.Kmer;
+
+public class LogAlgorithmLogFormatter extends Formatter {
+ //
+ // Create a DateFormat to format the logger timestamp.
+ //
+ //private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
+ private long step;
+ private byte[] sourceVertexId;
+ private byte[] destVertexId;
+ private LogAlgorithmMessageWritable msg;
+ private int state;
+ private int k;
+ private byte[] mergeChain;
+ private int lengthOfMergeChain;
+ //private boolean testDelete = false;
+ /** 0: general operation
+ * 1: testDelete
+ * 2: testMergeChain
+ * 3: testVoteToHalt
+ */
+ private int operation;
+
+ public void set(long step, byte[] sourceVertexId,
+ byte[] destVertexId, LogAlgorithmMessageWritable msg, int state, int k){
+ this.step = step;
+ this.sourceVertexId = sourceVertexId;
+ this.destVertexId = destVertexId;
+ this.msg = msg;
+ this.state = state;
+ this.k = k;
+ this.operation = 0;
+ }
+ public void setMergeChain(long step, byte[] sourceVertexId,
+ int lengthOfMergeChain, byte[] mergeChain, int k){
+ this.reset();
+ this.step = step;
+ this.sourceVertexId = sourceVertexId;
+ this.lengthOfMergeChain = lengthOfMergeChain;
+ this.mergeChain = mergeChain;
+ this.k = k;
+ this.operation = 2;
+ }
+ public void setVotoToHalt(long step, byte[] sourceVertexId, int k){
+ this.reset();
+ this.step = step;
+ this.sourceVertexId = sourceVertexId;
+ this.k = k;
+ this.operation = 3;
+ }
+ public void reset(){
+ this.sourceVertexId = null;
+ this.destVertexId = null;
+ this.msg = null;
+ this.state = 0;
+ this.k = 0;
+ this.mergeChain = null;
+ this.lengthOfMergeChain = 0;
+ }
+ public String format(LogRecord record) {
+ StringBuilder builder = new StringBuilder(1000);
+ String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+ String chain = "";
+
+ builder.append("Step: " + step + "\r\n");
+ builder.append("Source Code: " + source + "\r\n");
+ if(operation == 0){
+ if(destVertexId != null){
+ String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+ builder.append("Send message to " + "\r\n");
+ builder.append("Destination Code: " + dest + "\r\n");
+ }
+ builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getMessage()) + "\r\n");
+
+ if(msg.getLengthOfChain() != 0){
+ chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+ builder.append("Chain Message: " + chain + "\r\n");
+ builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
+ }
+
+ builder.append("State is: " + State.STATE_CONTENT.getContentFromCode(state) + "\r\n");
+ }
+ if(operation == 2){
+ chain = Kmer.recoverKmerFrom(lengthOfMergeChain, mergeChain, 0, mergeChain.length);
+ builder.append("Merge Chain: " + chain + "\r\n");
+ builder.append("Merge Chain Length: " + lengthOfMergeChain + "\r\n");
+ }
+ if(operation == 3)
+ builder.append("Vote to halt!");
+ if(!formatMessage(record).equals(""))
+ builder.append(formatMessage(record) + "\r\n");
+ builder.append("\n");
+ return builder.toString();
+ }
+
+ public String getHead(Handler h) {
+ return super.getHead(h);
+ }
+
+ public String getTail(Handler h) {
+ return super.getTail(h);
+ }
+ public int getOperation() {
+ return operation;
+ }
+ public void setOperation(int operation) {
+ this.operation = operation;
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
new file mode 100644
index 0000000..c9abd6e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.genomix.pregelix.log;
+
+import java.util.logging.*;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.type.Kmer;
+
+public class NaiveAlgorithmLogFormatter extends Formatter {
+ //
+ // Create a DateFormat to format the logger timestamp.
+ //
+ //private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
+ private long step;
+ private byte[] sourceVertexId;
+ private byte[] destVertexId;
+ private MessageWritable msg;
+ private int k;
+
+ public void set(long step, byte[] sourceVertexId,
+ byte[] destVertexId, MessageWritable msg, int k){
+ this.step = step;
+ this.sourceVertexId = sourceVertexId;
+ this.destVertexId = destVertexId;
+ this.msg = msg;
+ this.k = k;
+ }
+ public String format(LogRecord record) {
+ StringBuilder builder = new StringBuilder(1000);
+ String source = Kmer.recoverKmerFrom(k, sourceVertexId, 0, sourceVertexId.length);
+
+ String chain = "";
+
+ builder.append("Step: " + step + "\r\n");
+ builder.append("Source Code: " + source + "\r\n");
+
+ if(destVertexId != null){
+ builder.append("Send message to " + "\r\n");
+ String dest = Kmer.recoverKmerFrom(k, destVertexId, 0, destVertexId.length);
+ builder.append("Destination Code: " + dest + "\r\n");
+ }
+ if(msg.getLengthOfChain() != 0){
+ chain = Kmer.recoverKmerFrom(msg.getLengthOfChain(), msg.getChainVertexId(), 0, msg.getChainVertexId().length);
+ builder.append("Chain Message: " + chain + "\r\n");
+ builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
+ }
+ if(!formatMessage(record).equals(""))
+ builder.append(formatMessage(record) + "\r\n");
+ builder.append("\n");
+ return builder.toString();
+ }
+
+ public String getHead(Handler h) {
+ return super.getHead(h);
+ }
+
+ public String getTail(Handler h) {
+ return super.getTail(h);
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
new file mode 100644
index 0000000..bbbb84f
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.genomix.pregelix.sequencefile;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.genomix.pregelix.GraphVertexOperation;
+import edu.uci.ics.genomix.type.Kmer;
+import edu.uci.ics.genomix.type.KmerCountValue;
+
+
+public class CombineSequenceFile {
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ // TODO Auto-generated method stub
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+
+ Path p = new Path("data/ThreeKmer");
+ Path p2 = new Path("data/result");
+ Path outFile = new Path(p2, "output");
+ SequenceFile.Reader reader;
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, BytesWritable.class, KmerCountValue.class,
+ CompressionType.NONE);
+ BytesWritable key = new BytesWritable();
+ KmerCountValue value = new KmerCountValue();
+
+ File dir = new File("data/ThreeKmer");
+ for(File child : dir.listFiles()){
+ String name = child.getAbsolutePath();
+ Path inFile = new Path(p, name);
+ reader = new SequenceFile.Reader(fileSys, inFile, conf);
+ while (reader.next(key, value)) {
+ System.out.println(Kmer.recoverKmerFrom(GraphVertexOperation.k, key.getBytes(), 0,
+ key.getLength())
+ + "\t" + value.toString());
+ writer.append(key, value);
+ }
+ reader.close();
+ }
+ writer.close();
+ System.out.println();
+
+ reader = new SequenceFile.Reader(fileSys, outFile, conf);
+ while (reader.next(key, value)) {
+ System.err.println(Kmer.recoverKmerFrom(GraphVertexOperation.k, key.getBytes(), 0,
+ key.getLength())
+ + "\t" + value.toString());
+ }
+ reader.close();
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/ConvertToSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertToSequenceFile.java
similarity index 96%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/ConvertToSequenceFile.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertToSequenceFile.java
index 1aff95d..d64b279 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/ConvertToSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertToSequenceFile.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.SequenceFile;
+package edu.uci.ics.genomix.pregelix.sequencefile;
import java.io.IOException;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSequenceFile.java
similarity index 99%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/GenerateSequenceFile.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSequenceFile.java
index b7fed48..5c740e4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/GenerateSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSequenceFile.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.SequenceFile;
+package edu.uci.ics.genomix.pregelix.sequencefile;
import java.io.IOException;
import java.util.ArrayList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/generateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/generateSmallFile.java
similarity index 83%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/generateSmallFile.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/generateSmallFile.java
index eec2559..f44781f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/SequenceFile/generateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/generateSmallFile.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.SequenceFile;
+package edu.uci.ics.genomix.pregelix.sequencefile;
import java.io.IOException;
@@ -16,9 +16,7 @@
public static void generateNumOfLinesFromBigFile(Path inFile, Path outFile, int numOfLines) throws IOException{
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
-
- //ClassLoader ctxLoader = Thread.currentThread().getContextClassLoader();
- //Thread.currentThread().setContextClassLoader(GenerateSequenceFile.class.getClassLoader());
+
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, BytesWritable.class, KmerCountValue.class,
@@ -34,7 +32,6 @@
}
writer.close();
reader.close();
- //Thread.currentThread().setContextClassLoader(ctxLoader);
}
/**
* @param args
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testDeleteVertexId.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testDeleteVertexId.java
deleted file mode 100644
index fd6f0c5..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testDeleteVertexId.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.bitwise.BitwiseOperation;
-import edu.uci.ics.genomix.pregelix.example.client.Client;
-import edu.uci.ics.genomix.pregelix.example.io.MessageWritable;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class testDeleteVertexId extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
-
- /**
- * For test
- */
- @Override
- public void compute(Iterator<MessageWritable> msgIterator) {
- String s = "100100";
- byte[] b = BitwiseOperation.convertBinaryStringToBytes(s);
- if(getSuperstep() == 1 && Arrays.equals(b,getVertexId().getBytes())){
- //deleteVertex(getVertexId());
- }
- voteToHalt();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
- job.setVertexClass(TestLoadGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index f965d64..f556a73 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -6,4 +6,22 @@
public static final int START = 1;
public static final int END = 2;
+ public final static class MESSAGE_CONTENT{
+
+ public static String getContentFromCode(int code){
+ String r = "";
+ switch(code){
+ case NON:
+ r = "NON";
+ break;
+ case START:
+ r = "START";
+ break;
+ case END:
+ r = "END";
+ break;
+ }
+ return r;
+ }
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index 2a2a3f3..5447d26 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -7,4 +7,36 @@
public static final int MID_VERTEX = 3;
public static final int TODELETE = 4;
public static final int FINAL_VERTEX = 5;
+ public static final int FINAL_DELETE = 6;
+
+ public final static class STATE_CONTENT{
+
+ public static String getContentFromCode(int code){
+ String r = "";
+ switch(code){
+ case NON_VERTEX:
+ r = "NON_VERTEX";
+ break;
+ case START_VERTEX:
+ r = "START_VERTEX";
+ break;
+ case END_VERTEX:
+ r = "END_VERTEX";
+ break;
+ case MID_VERTEX:
+ r = "MID_VERTEX";
+ break;
+ case TODELETE:
+ r = "TODELETE";
+ break;
+ case FINAL_VERTEX:
+ r = "FINAL_VERTEX";
+ break;
+ case FINAL_DELETE:
+ r = "FINAL_DELETE";
+ break;
+ }
+ return r;
+ }
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 7e740cd..3cfe4c8 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -10,16 +10,14 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import edu.uci.ics.genomix.pregelix.BinaryLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.BinaryLoadGraphOutputFormat;
import edu.uci.ics.genomix.pregelix.LoadGraphVertex;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphInputFormat;
+import edu.uci.ics.genomix.pregelix.format.BinaryLoadGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphInputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.LogAlgorithmForMergeGraphVertex;
import edu.uci.ics.genomix.pregelix.MergeGraphVertex;
-import edu.uci.ics.genomix.pregelix.TestLoadGraphVertex;
-import edu.uci.ics.genomix.pregelix.TextLoadGraphInputFormat;
-import edu.uci.ics.genomix.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -32,21 +30,6 @@
private static void generateLoadGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(LoadGraphVertex.class);
- job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genLoadGraph() throws IOException {
- generateLoadGraphJob("LoadGraph", outputBase + "LoadGraph.xml");
- }
-
- private static void generateBinaryLoadGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(TestLoadGraphVertex.class);
job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
@@ -56,8 +39,8 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
- private static void genBinaryLoadGraph() throws IOException {
- generateBinaryLoadGraphJob("BinaryLoadGraph", outputBase + "BinaryLoadGraph.xml");
+ private static void genLoadGraph() throws IOException {
+ generateLoadGraphJob("LoadGraph", outputBase + "LoadGraph.xml");
}
private static void generateMergeGraphJob(String jobName, String outputPath) throws IOException {
@@ -66,7 +49,7 @@
job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -83,7 +66,7 @@
job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(ByteWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -99,7 +82,7 @@
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
- //genBinaryLoadGraph();
+ //genLoadGraph();
genMergeGraph();
//genLogAlgorithmForMergeGraph();
//genSequenceLoadGraph();
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestCase.java
index 98a7eea..84b342e 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestCase.java
@@ -1,9 +1,6 @@
package edu.uci.ics.genomix.pregelix.JobRun;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
import junit.framework.TestCase;
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
index e657b94..fb698e5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
@@ -41,7 +41,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/input/SinglePath.seq";//sequenceFileMergeTest
+ private static final String DATA_PATH = "data/result/TreePath";//sequenceShortFileMergeTest
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";
@@ -144,31 +144,17 @@
* Runs the tests and collects their result in a TestResult.
*/
@Override
- public void run(TestResult result) {
- OutputStreamWriter writer = null;
- try {
- writer = new OutputStreamWriter(new FileOutputStream("test/time",true));
- } catch (FileNotFoundException e1) { e1.printStackTrace();}
-
-
+ public void run(TestResult result) {
try {
int testCount = countTestCases();
for (int i = 0; i < testCount; i++) {
- long startTime = System.currentTimeMillis();
// cleanupStores();
Test each = this.testAt(i);
if (result.shouldStop())
break;
runTest(each, result);
- long endTime = System.currentTimeMillis();
- long totalTime = endTime - startTime;
- System.out.println(totalTime);
- try {
- writer.write("Time: " + totalTime + "\r\n");
- } catch (IOException e) { // TODO Auto-generated catch block
- e.printStackTrace();}
}
- writer.close();
+
tearDown();
} catch (Exception e) {
throw new IllegalStateException(e);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/LoadGraphVertexTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/LoadGraphVertexTest.java
deleted file mode 100644
index 3087377..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/LoadGraphVertexTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package edu.uci.ics.genomix.pregelix;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.StringTokenizer;
-
-import org.junit.Test;
-
-import edu.uci.ics.genomix.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.example.client.Client;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-
-public class LoadGraphVertexTest {
-
- /**
- * I can't debug in JUnits test so that I can't find my error here. So I leave comments here.
- * I will figure out as soon as possible.
- */
- private static final String EXPECT_RESULT_FILE = "expected_result";
- private static final String INPUT_PATHS = "folder";
- private static final String OUTPUT_PATH = "result";
- private static final String IP = "169.234.134.212";
- private static final String PORT = "3099";
-
- @SuppressWarnings("deprecation")
- @Test
- public void test() throws Exception {
- //initiate args
- /* String[] args = new String[8];
- args[0] = "-inputpaths";
- args[1] = INPUT_PATHS;
- args[2] = "-outputpath";
- args[3] = OUTPUT_PATH;
- args[4] = "-ip";
- args[5] = IP;
- args[6] = "-port";
- args[7] = PORT;
- PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
- job.setVertexClass(LoadGraphVertex.class);
- job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
- job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
- Client.run(args, job);
-
- generateExpectBinaryFile();
-
- //test if the actual file is the same as the expected file
- DataInputStream actual_dis = new DataInputStream(new FileInputStream(OUTPUT_PATH + "/*"));
- DataInputStream expected_dis = new DataInputStream(new FileInputStream(EXPECT_RESULT_FILE));
- String actualLine, expectedLine = null;
- StringTokenizer actualSt, expectedSt;
- byte[] actualVertexId, expectedVertexId = null;
- byte actualVertexValue, expectedVertexValue;
- byte[] tmp = null;
- while(((actualLine = actual_dis.readLine()) != null) &&
- ((expectedLine = expected_dis.readLine()) != null)){
- actualSt = new StringTokenizer(actualLine, " ");
- actualVertexId = actualSt.nextToken().getBytes();
- tmp = actualSt.nextToken().getBytes();
- actualVertexValue = tmp[0];
-
- expectedSt = new StringTokenizer(expectedLine," ");
- expectedVertexId = expectedSt.nextToken().getBytes();
- tmp = expectedSt.nextToken().getBytes();
- expectedVertexValue = tmp[0];
-
- assertEquals("actualVextexId == expectedVertexId", actualVertexId, expectedVertexId);
- assertEquals("actualVertexValue == expectedVertexValue", actualVertexValue, expectedVertexValue);
- }
-
- assertEquals("actualLine should be the end and be equal to null", actualLine, null);
- assertEquals("expectedLine should be the end and be equal to null", expectedLine, null);*/
- }
-
- @SuppressWarnings("deprecation")
- public void generateExpectBinaryFile() throws Exception{
- DataInputStream dis = new DataInputStream(new FileInputStream(INPUT_PATHS + "/*"));
- DataOutputStream dos = new DataOutputStream(new FileOutputStream(EXPECT_RESULT_FILE));
- String line;
- byte[] vertexId = null;
- byte vertexValue;
- byte[] tmp = null;
- while((line = dis.readLine()) != null){
- StringTokenizer st = new StringTokenizer(line, " ");
- vertexId = st.nextToken().getBytes();
- tmp = st.nextToken().getBytes();
- vertexValue = tmp[0];
-
- vertexValue = (byte) (vertexValue << 1);
- for(int i = 0; i < vertexId.length; i++)
- dos.writeByte(vertexId[i]);
- dos.writeByte((byte)32); //space
- dos.writeByte(vertexValue);
- dos.writeByte((byte)10); //line feed
- }
-
- dis.close();
- dos.close();
- }
-
-}