Merge branch 'genomix/fullstack_genomix' of https://code.google.com/p/hyracks into genomix/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 4883176..78d8d85 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -317,7 +317,7 @@
}
bytes[offset] = (byte) (nextCode & 0x3);
} else {
- bytes[offset] = (byte) (bytes[offset] | ((nextCode & 0x3) << (((kmerlength-1) % 4) << 1)));
+ bytes[offset] = (byte) (bytes[offset] | ((nextCode & 0x3) << (((kmerlength - 1) % 4) << 1)));
}
clearLeadBit();
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 0276c3a..73612e2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -16,8 +16,8 @@
private PositionListWritable incomingList;
private PositionListWritable outgoingList;
private KmerBytesWritable kmer;
-
- public NodeWritable(){
+
+ public NodeWritable() {
nodeID = new PositionWritable();
incomingList = new PositionListWritable();
outgoingList = new PositionListWritable();
@@ -47,6 +47,10 @@
incomingList.set(incoming);
}
+ public void setKmer(KmerBytesWritable kmer) {
+ this.kmer = kmer;
+ }
+
public void setOutgoingList(PositionListWritable outgoing) {
outgoingList.set(outgoing);
}
@@ -111,9 +115,9 @@
public int hashCode() {
return nodeID.hashCode();
}
-
+
@Override
- public String toString(){
+ public String toString() {
StringBuilder sbuilder = new StringBuilder();
sbuilder.append('(');
sbuilder.append(nodeID.toString()).append('\t');
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
new file mode 100644
index 0000000..55c6ca7
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.genomix.type;
+
+public class NoteWritableFactory {
+ private NodeWritable node;
+ private KmerBytesWritableFactory kmerBytesWritableFactory;
+ private int kmerSize = 55;
+
+ public NoteWritableFactory() {
+ node = new NodeWritable();
+ kmerBytesWritableFactory = new KmerBytesWritableFactory(kmerSize);
+ }
+
+ public NodeWritable append(final NodeWritable orignalNode, final KmerBytesWritable appendKmer){
+ KmerBytesWritable preKmer = orignalNode.getKmer();
+ node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(preKmer,appendKmer));
+ return node;
+ }
+
+ public NodeWritable append(final NodeWritable orignalNode, final NodeWritable appendNode) {
+ KmerBytesWritable nextKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, appendNode.getKmer().kmerlength - kmerSize + 2,
+ appendNode.getKmer());
+ return append(orignalNode, nextKmer);
+ }
+
+ public NodeWritable prepend(final NodeWritable orignalNode, final KmerBytesWritable prependKmer){
+ KmerBytesWritable nextKmer = orignalNode.getKmer();
+ node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(prependKmer,nextKmer));
+ return node;
+ }
+
+ public NodeWritable prepend(final NodeWritable orignalNode, final NodeWritable prependNode) {
+ KmerBytesWritable prependKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, orignalNode.getKmer().kmerlength - kmerSize + 2,
+ orignalNode.getKmer());
+ return prepend(orignalNode, prependKmer);
+ }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index c19af12..42ff47a 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -10,7 +10,7 @@
import edu.uci.ics.genomix.data.Marshal;
-public class PositionWritable implements WritableComparable<PositionWritable> , Serializable{
+public class PositionWritable implements WritableComparable<PositionWritable>, Serializable {
/**
*
*/
@@ -33,11 +33,15 @@
public PositionWritable(byte[] storage, int offset) {
setNewReference(storage, offset);
}
-
+
public void setNewReference(byte[] storage, int offset) {
this.storage = storage;
this.offset = offset;
}
+
+ public void set(PositionWritable pos) {
+ set(pos.getReadID(), pos.getPosInRead());
+ }
public void set(int readID, byte posInRead) {
Marshal.putInt(readID, storage, offset);
@@ -73,7 +77,7 @@
public void write(DataOutput out) throws IOException {
out.write(storage, offset, LENGTH);
}
-
+
@Override
public int hashCode() {
return this.getReadID();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
index a4134af..636cf86 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -13,8 +14,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.NodeWritable;
public class BinaryVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M> {
@@ -38,7 +38,7 @@
public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<KmerBytesWritable, KmerCountValue> lineRecordReader;
+ private final RecordReader<NodeWritable, NullWritable> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -48,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryVertexReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
+ public BinaryVertexReader(RecordReader<NodeWritable, NullWritable> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -74,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<KmerBytesWritable, KmerCountValue> getRecordReader() {
+ protected RecordReader<NodeWritable, NullWritable> getRecordReader() {
return lineRecordReader;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
index d921b5e..8fbd1ce 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -11,7 +11,7 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter;
+ private final RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<KmerBytesWritable, ValueStateWritable> getRecordWriter() {
+ public RecordWriter<PositionWritable, ValueStateWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 438c99d..94b0c51 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -76,7 +76,7 @@
}
if (options.pseudoRate > 0 && options.pseudoRate <= 1)
- job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
+ job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
if (options.maxRound > 0)
job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, options.maxRound);
return options;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 4a76ff6..3fe04ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -8,35 +8,36 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
public class LogAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ BinaryVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
+ public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ BinaryVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
private Vertex vertex = null;
- private KmerBytesWritable vertexId = null;
+ private NodeWritable node = new NodeWritable();
+ private PositionWritable vertexId = new PositionWritable();
private ValueStateWritable vertexValue = new ValueStateWritable();
- public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
super(recordReader);
}
@@ -47,7 +48,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex()
+ public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -59,15 +60,15 @@
/**
* set the src vertex id
*/
- if (vertexId == null)
- vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
- vertexId.set(getRecordReader().getCurrentKey());
+ node = getRecordReader().getCurrentKey();
+ vertexId.set(node.getNodeID());
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
+ vertexValue.setIncomingList(node.getIncomingList());
+ vertexValue.setOutgoingList(node.getOutgoingList());
+ vertexValue.setMergeChain(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 110247e..8a04292 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -11,16 +11,16 @@
import edu.uci.ics.pregelix.api.io.VertexWriter;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
public class LogAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
@Override
- public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
- RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<PositionWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -28,20 +28,18 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- //&& vertex.getVertexValue().getState() != State.MID_VERTEX
if (vertex.getVertexValue().getState() != State.END_VERTEX) {
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
-
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index 8abfcd0..4011838 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -7,24 +7,25 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
public class NaiveAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ BinaryVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> createVertexReader(
+ public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -32,12 +33,13 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ BinaryVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
private Vertex vertex;
- private KmerBytesWritable vertexId = null;
+ private NodeWritable node = new NodeWritable();
+ private PositionWritable vertexId = new PositionWritable();
private ValueStateWritable vertexValue = new ValueStateWritable();
- public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
super(recordReader);
}
@@ -48,7 +50,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> getCurrentVertex()
+ public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -61,15 +63,16 @@
/**
* set the src vertex id
*/
- if (vertexId == null)
- vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
- vertexId.set(getRecordReader().getCurrentKey());
+ node = getRecordReader().getCurrentKey();
+ vertexId.set(node.getNodeID());
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
+ vertexValue.setIncomingList(node.getIncomingList());
+ vertexValue.setOutgoingList(node.getOutgoingList());
+ vertexValue.setMergeChain(node.getKmer());
+ vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index 311283d..fe3b12d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -8,18 +8,18 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
public class NaiveAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
@Override
- public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
- RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<PositionWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -27,16 +27,14 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- //if(vertex.getVertexValue().getState() == State.FILTER
- // || vertex.getVertexValue().getState() == State.FINAL_VERTEX)
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
deleted file mode 100644
index fc57b74..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.type.CheckMessage;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-
-public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable> {
- /**
- * sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA
- * file stores the point to the file that stores the chains of connected DNA
- */
- private KmerBytesWritable sourceVertexId;
- private VKmerBytesWritable chainVertexId;
- private byte adjMap;
- private byte message;
-
- private byte checkMessage;
-
- public LogAlgorithmMessageWritable() {
- sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
- adjMap = 0;
- message = 0;
- checkMessage = 0;
- }
-
- public void set(KmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, byte message) {
- checkMessage = 0;
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- this.message = message;
- }
-
- public void reset() {
- checkMessage = 0;
- chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
- adjMap = (byte) 0;
- message = 0;
- }
-
- public KmerBytesWritable getSourceVertexId() {
- return sourceVertexId;
- }
-
- public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- }
-
- public byte getAdjMap() {
- return adjMap;
- }
-
- public void setAdjMap(byte adjMap) {
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- }
-
- public VKmerBytesWritable getChainVertexId() {
- return chainVertexId;
- }
-
- public void setChainVertexId(VKmerBytesWritable chainVertexId) {
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- }
-
- public byte getMessage() {
- return message;
- }
-
- public void setMessage(byte message) {
- this.message = message;
- }
-
- public int getLengthOfChain() {
- return chainVertexId.getKmerLength();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(checkMessage);
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.write(out);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- out.write(adjMap);
- out.writeByte(message);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.reset();
- checkMessage = in.readByte();
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.readFields(in);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- adjMap = in.readByte();
- message = in.readByte();
- }
-
- @Override
- public int hashCode() {
- return chainVertexId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof LogAlgorithmMessageWritable) {
- LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
- return chainVertexId.equals(tp.chainVertexId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return chainVertexId.toString();
- }
-
- @Override
- public int compareTo(LogAlgorithmMessageWritable tp) {
- return chainVertexId.compareTo(tp.chainVertexId);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
new file mode 100644
index 0000000..31d313d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -0,0 +1,153 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class MessageWritable implements WritableComparable<MessageWritable> {
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private PositionWritable sourceVertexId;
+ private KmerBytesWritable chainVertexId;
+ private PositionListWritable neighberNode; //incoming or outgoing
+ private byte message;
+
+ private byte checkMessage;
+
+ public MessageWritable() {
+ sourceVertexId = new PositionWritable();
+ chainVertexId = new KmerBytesWritable(0);
+ neighberNode = new PositionListWritable();
+ message = Message.NON;
+ checkMessage = (byte) 0;
+ }
+
+ public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, PositionListWritable neighberNode, byte message) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ this.message = message;
+ }
+
+ public void reset() {
+ checkMessage = 0;
+ chainVertexId.reset(1);
+ neighberNode.reset();
+ message = Message.NON;
+ }
+
+ public PositionWritable getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(PositionWritable sourceVertexId) {
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ }
+
+ public KmerBytesWritable getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public void setChainVertexId(KmerBytesWritable chainVertexId) {
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ }
+
+ public PositionListWritable getNeighberNode() {
+ return neighberNode;
+ }
+
+ public void setNeighberNode(PositionListWritable neighberNode) {
+ if(neighberNode != null){
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ }
+
+ public int getLengthOfChain() {
+ return chainVertexId.getKmerLength();
+ }
+
+ public byte getMessage() {
+ return message;
+ }
+
+ public void setMessage(byte message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(checkMessage);
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.write(out);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.write(out);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.write(out);
+ out.write(message);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.reset();
+ checkMessage = in.readByte();
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.readFields(in);
+ message = in.readByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return sourceVertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof MessageWritable) {
+ MessageWritable tp = (MessageWritable) o;
+ return sourceVertexId.equals(tp.sourceVertexId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return sourceVertexId.toString();
+ }
+
+ @Override
+ public int compareTo(MessageWritable tp) {
+ return sourceVertexId.compareTo(tp.sourceVertexId);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
deleted file mode 100644
index c5378a2..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.type.CheckMessage;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-
-public class NaiveAlgorithmMessageWritable implements WritableComparable<NaiveAlgorithmMessageWritable> {
- /**
- * sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * file stores the point to the file that stores the chains of connected DNA
- */
- private KmerBytesWritable sourceVertexId;
- private byte adjMap;
- private byte lastGeneCode;
- private VKmerBytesWritable chainVertexId;
- private byte message;
-
- private byte checkMessage;
-
- public NaiveAlgorithmMessageWritable() {
- sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(1);
- adjMap = (byte) 0;
- lastGeneCode = (byte) -1;
- message = Message.NON;
- checkMessage = (byte) 0;
- }
-
- public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, VKmerBytesWritable chainVertexId, byte message) {
- checkMessage = 0;
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- if (lastGeneCode != 0) {
- checkMessage |= CheckMessage.LASTGENECODE;
- this.lastGeneCode = lastGeneCode;
- }
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- this.message = message;
- }
-
- public void reset() {
- checkMessage = 0;
- adjMap = (byte) 0;
- lastGeneCode = (byte) -1;
- chainVertexId.reset(1);
- message = Message.NON;
- }
-
- public KmerBytesWritable getSourceVertexId() {
- return sourceVertexId;
- }
-
- public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- }
-
- public byte getAdjMap() {
- return adjMap;
- }
-
- public void setAdjMap(byte adjMap) {
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- }
-
- public byte getLastGeneCode() {
- return lastGeneCode;
- }
-
- public void setLastGeneCode(byte lastGeneCode) {
- if (lastGeneCode != -1) {
- checkMessage |= CheckMessage.LASTGENECODE;
- this.lastGeneCode = lastGeneCode;
- }
- }
-
- public VKmerBytesWritable getChainVertexId() {
- return chainVertexId;
- }
-
- public void setChainVertexId(VKmerBytesWritable chainVertexId) {
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- }
-
- public int getLengthOfChain() {
- return chainVertexId.getKmerLength();
- }
-
- public byte getMessage() {
- return message;
- }
-
- public void setMessage(byte message) {
- this.message = message;
- }
-
- public boolean isGeneCode(){
- return ((checkMessage & CheckMessage.LASTGENECODE) != 0);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(checkMessage);
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- out.write(adjMap);
- if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
- out.write(lastGeneCode);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.write(out);
- out.write(message);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.reset();
- checkMessage = in.readByte();
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- adjMap = in.readByte();
- if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
- lastGeneCode = in.readByte();
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.readFields(in);
- message = in.readByte();
- }
-
- @Override
- public int hashCode() {
- return sourceVertexId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof NaiveAlgorithmMessageWritable) {
- NaiveAlgorithmMessageWritable tp = (NaiveAlgorithmMessageWritable) o;
- return sourceVertexId.equals(tp.sourceVertexId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return sourceVertexId.toString();
- }
-
- @Override
- public int compareTo(NaiveAlgorithmMessageWritable tp) {
- return sourceVertexId.compareTo(tp.sourceVertexId);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 9a9e30f..2554c8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -5,40 +5,50 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
- private byte adjMap;
+ private PositionListWritable incomingList;
+ private PositionListWritable outgoingList;
private byte state;
- private VKmerBytesWritable mergeChain;
+ private KmerBytesWritable mergeChain;
public ValueStateWritable() {
+ incomingList = new PositionListWritable();
+ outgoingList = new PositionListWritable();
state = State.NON_VERTEX;
- mergeChain = new VKmerBytesWritable(0);
- //isOp = false;
+ mergeChain = new KmerBytesWritable(0);
}
- public ValueStateWritable(byte adjMap, byte state, VKmerBytesWritable mergeChain) {
- this.adjMap = adjMap;
+ public ValueStateWritable(PositionListWritable incomingList, PositionListWritable outgoingList,
+ byte state, KmerBytesWritable mergeChain) {
+ set(incomingList, outgoingList, state, mergeChain);
+ }
+
+ public void set(PositionListWritable incomingList, PositionListWritable outgoingList,
+ byte state, KmerBytesWritable mergeChain) {
+ this.incomingList.set(incomingList);
+ this.outgoingList.set(outgoingList);
this.state = state;
this.mergeChain.set(mergeChain);
}
-
- public void set(byte adjMap, byte state, VKmerBytesWritable mergeChain) {
- this.adjMap = adjMap;
- this.state = state;
- this.mergeChain.set(mergeChain);
+
+ public PositionListWritable getIncomingList() {
+ return incomingList;
}
- public byte getAdjMap() {
- return adjMap;
+ public void setIncomingList(PositionListWritable incomingList) {
+ this.incomingList = incomingList;
}
- public void setAdjMap(byte adjMap) {
- this.adjMap = adjMap;
+ public PositionListWritable getOutgoingList() {
+ return outgoingList;
+ }
+
+ public void setOutgoingList(PositionListWritable outgoingList) {
+ this.outgoingList = outgoingList;
}
public byte getState() {
@@ -53,7 +63,7 @@
return mergeChain.getKmerLength();
}
- public VKmerBytesWritable getMergeChain() {
+ public KmerBytesWritable getMergeChain() {
return mergeChain;
}
@@ -61,20 +71,18 @@
this.mergeChain.set(mergeChain);
}
- public void setMergeChain(VKmerBytesWritable mergeChain) {
- this.mergeChain.set(mergeChain);
- }
-
@Override
public void readFields(DataInput in) throws IOException {
- adjMap = in.readByte();
+ incomingList.readFields(in);
+ outgoingList.readFields(in);
state = in.readByte();
mergeChain.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeByte(adjMap);
+ incomingList.write(out);
+ outgoingList.write(out);
out.writeByte(state);
mergeChain.write(out);
}
@@ -86,7 +94,14 @@
@Override
public String toString() {
- return GeneCode.getSymbolFromBitMap(adjMap) + "\t" + getLengthOfMergeChain() + "\t" + mergeChain.toString();
+ return state + "\t" + getLengthOfMergeChain() + "\t" + mergeChain.toString();
+ }
+
+ public int inDegree() {
+ return incomingList.getCountOfPosition();
}
+ public int outDegree() {
+ return outgoingList.getCountOfPosition();
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
index ae950f4..95e070f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -4,22 +4,19 @@
import java.util.logging.Handler;
import java.util.logging.LogRecord;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
public class DataLoadLogFormatter extends Formatter {
- private VKmerBytesWritable key;
- private KmerCountValue value;
+ private NodeWritable key;
- public void set(VKmerBytesWritable key, KmerCountValue value) {
+ public void set(NodeWritable key) {
this.key.set(key);
- this.value = value;
}
public String format(LogRecord record) {
StringBuilder builder = new StringBuilder(1000);
- builder.append(key.toString() + "\t" + value.toString() + "\r\n");
+ builder.append(key.toString() + "\r\n");
if (!formatMessage(record).equals(""))
builder.append(formatMessage(record) + "\r\n");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 9eba176..dca2cb8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -2,22 +2,21 @@
import java.util.logging.*;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
public class LogAlgorithmLogFormatter extends Formatter {
//
// Create a DateFormat to format the logger timestamp.
//
- //private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private VKmerBytesWritable sourceVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
+ private KmerBytesWritable sourceVertexId = new KmerBytesWritable(1);
+ private KmerBytesWritable destVertexId = new KmerBytesWritable(1);
+ private MessageWritable msg = new MessageWritable();
private byte state;
- private VKmerBytesWritable mergeChain = new VKmerBytesWritable(1);;
+ private KmerBytesWritable mergeChain = new KmerBytesWritable(1);;
//private boolean testDelete = false;
/**
* 0: general operation
@@ -30,8 +29,8 @@
public LogAlgorithmLogFormatter() {
}
- public void set(long step, VKmerBytesWritable sourceVertexId, VKmerBytesWritable destVertexId,
- LogAlgorithmMessageWritable msg, byte state) {
+ public void set(long step, KmerBytesWritable sourceVertexId, KmerBytesWritable destVertexId,
+ MessageWritable msg, byte state) {
this.step = step;
this.sourceVertexId.set(sourceVertexId);
this.destVertexId.set(destVertexId);
@@ -40,7 +39,7 @@
this.operation = 0;
}
- public void setMergeChain(long step, VKmerBytesWritable sourceVertexId, VKmerBytesWritable mergeChain) {
+ public void setMergeChain(long step, KmerBytesWritable sourceVertexId, KmerBytesWritable mergeChain) {
this.reset();
this.step = step;
this.sourceVertexId.set(sourceVertexId);
@@ -48,7 +47,7 @@
this.operation = 2;
}
- public void setVotoToHalt(long step, VKmerBytesWritable sourceVertexId) {
+ public void setVotoToHalt(long step, KmerBytesWritable sourceVertexId) {
this.reset();
this.step = step;
this.sourceVertexId.set(sourceVertexId);
@@ -56,11 +55,11 @@
}
public void reset() {
- this.sourceVertexId = new VKmerBytesWritable(1);
- this.destVertexId = new VKmerBytesWritable(1);
- this.msg = new LogAlgorithmMessageWritable();
+ this.sourceVertexId = new KmerBytesWritable(1);
+ this.destVertexId = new KmerBytesWritable(1);
+ this.msg = new MessageWritable();
this.state = 0;
- this.mergeChain = new VKmerBytesWritable(1);
+ this.mergeChain = new KmerBytesWritable(1);
}
public String format(LogRecord record) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
index 39b0bc1..4a5850a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
@@ -2,7 +2,7 @@
import java.util.logging.*;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
public class NaiveAlgorithmLogFormatter extends Formatter {
//
@@ -10,10 +10,10 @@
//
//private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private VKmerBytesWritable sourceVertexId;
- private VKmerBytesWritable destVertexId;
+ private KmerBytesWritable sourceVertexId;
+ private KmerBytesWritable destVertexId;
- public void set(long step, VKmerBytesWritable sourceVertexId, VKmerBytesWritable destVertexId) {
+ public void set(long step, KmerBytesWritable sourceVertexId, KmerBytesWritable destVertexId) {
this.step = step;
this.sourceVertexId.set(sourceVertexId);
this.destVertexId.set(destVertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
index b033c28..6be0eee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -9,21 +9,19 @@
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.PositionWritable;
/*
* vertexId: BytesWritable
* vertexValue: ValueStateWritable
* edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
+ * message: MessageWritable
*
* DNA:
* A: 00
@@ -48,19 +46,20 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LogAlgorithmForPathMergeVertex extends
- Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "LogAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
- private LogAlgorithmMessageWritable incomingMsg = new LogAlgorithmMessageWritable();
- private LogAlgorithmMessageWritable outgoingMsg = new LogAlgorithmMessageWritable();
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
+ private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
/**
* initiate kmerSize, maxIteration
*/
@@ -75,42 +74,35 @@
/**
* get destination vertex
*/
- public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ return posIterator.next();
}
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap) {
- return getDestVertexIdFromChain(chainVertexId, adjMap);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap) {
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+ public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ return posIterator.next();
}
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if ((adjMap & (1 << x)) != 0) {
- sendMsg(getNextDestVertexId(vertexId, x), outgoingMsg);
- }
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if (((adjMap >> 4) & (1 << x)) != 0) {
- sendMsg(getPreDestVertexId(vertexId, x), outgoingMsg);
- }
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
@@ -118,14 +110,14 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isHeadVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllNextNodes(getVertexValue());
voteToHalt();
}
- if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isRearVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllPreviousNodes(getVertexValue());
voteToHalt();
}
}
@@ -133,9 +125,9 @@
/**
* initiate head, rear and path node
*/
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void initState(Iterator<MessageWritable> msgIterator) {
while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
@@ -154,7 +146,7 @@
getVertexValue().setMergeChain(null);
} else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
- getVertexValue().setMergeChain(getVertexId());
+ getVertexValue().setMergeChain(getVertexValue().getMergeChain());
voteToHalt();
} else
voteToHalt();
@@ -163,29 +155,29 @@
/**
* head send message to path
*/
- public void sendOutMsg(KmerBytesWritable chainVertexId, byte adjMap) {
+ public void sendOutMsg() {
if (getVertexValue().getState() == State.START_VERTEX) {
outgoingMsg.setMessage(Message.START);
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
} else if (getVertexValue().getState() != State.END_VERTEX) {
outgoingMsg.setMessage(Message.NON);
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
}
}
/**
* head send message to path
*/
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3) {
- getVertexValue().setMergeChain(getVertexId());
- sendOutMsg(getVertexId(), getVertexValue().getAdjMap());
+ getVertexValue().setMergeChain(getVertexValue().getMergeChain());
+ sendOutMsg();
} else {
if (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if (mergeChainVertex(msgIterator)) {
+ if (mergeChainVertex()) {
if (incomingMsg.getMessage() == Message.END) {
if (getVertexValue().getState() == State.START_VERTEX) {
getVertexValue().setState(State.FINAL_VERTEX);
@@ -194,7 +186,7 @@
} else
getVertexValue().setState(State.END_VERTEX);
} else
- sendOutMsg(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
+ sendOutMsg();
}
}
}
@@ -203,11 +195,11 @@
/**
* path response message to head
*/
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
if (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setMessage(Message.END);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -223,28 +215,23 @@
/**
* merge chainVertex and store in vertexVal.chainVertexId
*/
- public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public boolean mergeChainVertex() {
//merge chain
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
incomingMsg.getChainVertexId()));
- chainVertexId.set(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer));
- if (VertexUtil.isCycle(getVertexId(), chainVertexId, kmerSize)) {
- getVertexValue().setMergeChain(null);
- getVertexValue().setAdjMap(
- VertexUtil.reverseAdjMap(getVertexValue().getAdjMap(),
- chainVertexId.getGeneCodeAtPosition(kmerSize)));
+ KmerBytesWritable chainVertexId = kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer);
+ getVertexValue().setMergeChain(chainVertexId);
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
+ if (VertexUtil.isCycle(kmerFactory.getFirstKmerFromChain(kmerSize, getVertexValue().getMergeChain()),
+ chainVertexId, kmerSize)) {
getVertexValue().setState(State.CYCLE);
return false;
- } else
- getVertexValue().setMergeChain(chainVertexId);
-
- byte tmpVertexValue = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(tmpVertexValue);
+ }
return true;
}
@Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1)
startSendMsg();
@@ -268,7 +255,7 @@
*/
job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index b637f84..af38072 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -3,17 +3,16 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
@@ -23,7 +22,7 @@
* vertexId: BytesWritable
* vertexValue: ByteWritable
* edgeValue: NullWritable
- * message: NaiveAlgorithmMessageWritable
+ * message: MessageWritable
*
* DNA:
* A: 00
@@ -51,18 +50,21 @@
* Naive Algorithm for path merge graph
*/
public class NaiveAlgorithmForPathMergeVertex extends
- Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
- private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
- private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
-
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
+
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
+ private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+
/**
* initiate kmerSize, maxIteration
*/
@@ -77,40 +79,35 @@
/**
* get destination vertex
*/
- public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ return posIterator.next();
}
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap) {
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+ public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ return posIterator.next();
}
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if ((adjMap & (1 << x)) != 0) {
- destVertexId.set(getDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if (((adjMap >> 4) & (1 << x)) != 0) {
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
@@ -118,22 +115,22 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isHeadVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isRearVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllPreviousNodes(getVertexValue());
}
}
/**
* initiate head, rear and path node
*/
- public void initState(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void initState(Iterator<MessageWritable> msgIterator) {
while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
@@ -155,33 +152,36 @@
} else
voteToHalt();
}
+
+ /**
+ * merge chainVertex and store in vertexVal.chainVertexId
+ */
+ public void mergeChainVertex() {
+ //merge chain
+ lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+ incomingMsg.getChainVertexId()));
+ getVertexValue().setMergeChain(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer));
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
+ }
/**
* head node sends message to path node
*/
- public void sendMsgToPathVertex(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3) {
- getVertexValue().setMergeChain(getVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if (incomingMsg.getMessage() != Message.STOP) {
- getVertexValue().setMergeChain(
- kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
+ mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId
- .set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), incomingMsg.getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
- getVertexValue().setMergeChain(
- kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
+ mergeChainVertex();
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
//System.out.println();
@@ -195,15 +195,16 @@
*/
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setMessage(Message.STOP);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ destVertexId.set(incomingMsg.getSourceVertexId());
+ sendMsg(destVertexId, outgoingMsg);
}
@Override
- public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
startSendMsg();
@@ -232,7 +233,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
index 44d47a0..6f4354a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
@@ -3,17 +3,16 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
@@ -23,7 +22,7 @@
* vertexId: BytesWritable
* vertexValue: ByteWritable
* edgeValue: NullWritable
- * message: NaiveAlgorithmMessageWritable
+ * message: MessageWritable
*
* DNA:
* A: 00
@@ -51,7 +50,7 @@
* Naive Algorithm for path merge graph
*/
public class P3ForPathMergeVertex extends
- Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
@@ -61,13 +60,14 @@
public static float pseudoRate = -1;
public static int maxRound = -1;
- private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
- private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
+ private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
/**
* initiate kmerSize, maxIteration
*/
@@ -86,40 +86,35 @@
/**
* get destination vertex
*/
- public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ return posIterator.next();
}
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap) {
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+ public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ return posIterator.next();
}
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if ((adjMap & (1 << x)) != 0) {
- destVertexId.set(getDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if (((adjMap >> 4) & (1 << x)) != 0) {
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
@@ -127,25 +122,23 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isHeadVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
+ sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isRearVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
+ sendMsgToAllPreviousNodes(getVertexValue());
}
}
-
+
/**
* initiate head, rear and path node
*/
- public void initState(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void initState(Iterator<MessageWritable> msgIterator) {
if (msgIterator.hasNext()) {
do {
- if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
@@ -167,7 +160,20 @@
voteToHalt();*/
}
}
-
+
+ /**
+ * set vertex state
+ */
+ public void setState() {
+ if (incomingMsg.getMessage() == Message.START) {
+ getVertexValue().setState(State.START_VERTEX);
+ } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ getVertexValue().setState(State.END_VERTEX);
+ voteToHalt();
+ } else
+ voteToHalt();
+ }
+
/**
* mark the pseudoHead
*/
@@ -175,8 +181,7 @@
getVertexValue().setState(State.PSEUDOHEAD);
outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
destVertexId
- .set(getPreDestVertexId(getVertexId(),
- GeneCode.getGeneCodeFromBitMap((byte) ((getVertexValue().getAdjMap() >> 4) & 0x0F))));
+ .set(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -194,45 +199,26 @@
getVertexValue().setState(State.START_HALT);
}
}
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
- getVertexValue().setState(State.START_VERTEX);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(State.END_VERTEX);
- voteToHalt();
- } else
- voteToHalt();
- }
-
+
/**
* merge chain vertex
*/
public void mergeChainVertex(){
- if(incomingMsg.isGeneCode() == true){
- getVertexValue().setMergeChain(
- kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
- }
- else{
- lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
- getVertexValue().setMergeChain(
- kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
- lastKmer));
- }
+ lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+ incomingMsg.getChainVertexId()));
+ getVertexValue().setMergeChain(
+ kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ lastKmer));
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
/**
* head node sends message to path node
*/
- public void sendMsgToPathVertexMergePhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertexMergePhase(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3 + 2 * maxRound + 2) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
@@ -241,12 +227,10 @@
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId
- .set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), incomingMsg.getAdjMap()));
+ .set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
mergeChainVertex();
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
//System.out.println();
@@ -260,11 +244,8 @@
*/
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- if(getVertexValue().getLengthOfMergeChain() == 0)
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- else
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setMessage(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -273,12 +254,11 @@
/**
* head node sends message to path node in partition phase
*/
- public void sendMsgToPathVertexPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertexPartitionPhase(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 4) {
- getVertexValue().setMergeChain(getVertexId());
if(getVertexValue().getState() != State.START_HALT){
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
}
@@ -288,13 +268,10 @@
//if from pseudoHead, voteToHalt(), otherwise ...
if (incomingMsg.getMessage() != Message.FROMPSEUDOHEAD){
mergeChainVertex();
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
if (incomingMsg.getMessage() != Message.STOP
&& incomingMsg.getMessage() != Message.FROMPSEUDOREAR) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
- incomingMsg.getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
} else {
@@ -321,11 +298,8 @@
outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
else {
deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- if(getVertexValue().getLengthOfMergeChain() == 0)
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- else
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setNeighberNode(incomingMsg.getNeighberNode());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.PSEUDOREAR)
outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State.END_VERTEX)
@@ -338,12 +312,11 @@
/**
* final process the result of partition phase
*/
- public void finalProcessPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator){
+ public void finalProcessPartitionPhase(Iterator<MessageWritable> msgIterator){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
mergeChainVertex();
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
//check head or pseudoHead
if (getVertexValue().getState() == State.START_VERTEX
&& incomingMsg.getMessage() == Message.STOP) {
@@ -367,7 +340,7 @@
}
@Override
- public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1)
startSendMsg();
@@ -411,7 +384,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
deleted file mode 100644
index 7a50537..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package edu.uci.ics.genomix.pregelix.sequencefile;
-
-import java.io.File;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-
-public class CombineSequenceFile {
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- // TODO Auto-generated method stub
- int kmerSize = 5;
- Configuration conf = new Configuration();
- FileSystem fileSys = FileSystem.get(conf);
-
- Path p = new Path("graphbuildresult/CyclePath2_result");
- //Path p2 = new Path("data/result");
- Path outFile = new Path("here");
- SequenceFile.Reader reader;
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE);
- KmerBytesWritable key = new KmerBytesWritable(kmerSize);
- KmerCountValue value = new KmerCountValue();
-
- File dir = new File("graphbuildresult/CyclePath2_result");
- for (File child : dir.listFiles()) {
- String name = child.getAbsolutePath();
- Path inFile = new Path(p, name);
- reader = new SequenceFile.Reader(fileSys, inFile, conf);
- while (reader.next(key, value)) {
- System.out.println(key.toString() + "\t" + value.toString());
- writer.append(key, value);
- }
- reader.close();
- }
- writer.close();
- System.out.println();
-
- reader = new SequenceFile.Reader(fileSys, outFile, conf);
- while (reader.next(key, value)) {
- System.err.println(key.toString() + "\t" + value.toString());
- }
- reader.close();
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index bb288ff..6b9eb4e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -12,11 +12,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
public class GenerateSmallFile {
@@ -27,15 +27,14 @@
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE);
+ NullWritable.class, CompressionType.NONE);
KmerBytesWritable outKey = new KmerBytesWritable(55);
- KmerCountValue outValue = new KmerCountValue();
int i = 0;
for (i = 0; i < numOfLines; i++) {
// System.out.println(i);
- reader.next(outKey, outValue);
- writer.append(outKey, outValue);
+ reader.next(outKey, null);
+ writer.append(outKey, null);
}
writer.close();
reader.close();
@@ -47,18 +46,6 @@
writeTextFile(outFile, lines);
}
- public static void main(String[] args) throws IOException {
- Path dir = new Path("data/split.aa");
- Path outDir = new Path("data/input");
- FileUtils.cleanDirectory(new File("data/input"));
- Path inFile = new Path(dir, "part-0");
- Path outFile = new Path(outDir, "part-0-out-1000");
- generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 1000);
- /* String inFile = "data/shortjump_1.head8M.fastq";
- String outFile = "data/testGeneFile";
- generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);*/
- }
-
public static String readTextFile(String fileName, int numOfLines) {
String returnValue = "";
FileReader file;
@@ -93,6 +80,17 @@
} catch (IOException e) {
e.printStackTrace();
}
-
+ }
+
+ public static void main(String[] args) throws IOException {
+ Path dir = new Path("data/split.aa");
+ Path outDir = new Path("data/input");
+ FileUtils.cleanDirectory(new File("data/input"));
+ Path inFile = new Path(dir, "part-0");
+ Path outFile = new Path(outDir, "part-0-out-1000");
+ generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 1000);
+ /* String inFile = "data/shortjump_1.head8M.fastq";
+ String outFile = "data/testGeneFile";
+ generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);*/
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 517b9c3..45609c6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -15,7 +15,6 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
public class GenerateTextFile {
@@ -99,10 +98,9 @@
Path path = new Path("data/input/part-0-out-3000000");
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
KmerBytesWritable key = new KmerBytesWritable(55);
- KmerCountValue value = new KmerCountValue();
- while (reader.next(key, value)) {
- if (key == null || value == null) {
+ while (reader.next(key, null)) {
+ if (key == null) {
break;
}
bw.write(key.toString());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 61d2256..6e6a97a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -4,7 +4,7 @@
public static final byte SOURCE = 1 << 0;
public static final byte CHAIN = 1 << 1;
- public static final byte ADJMAP = 1 << 2;
+ public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
public static final byte STATE = 1 << 4;
public static final byte LASTGENECODE = 1 << 5;
@@ -20,8 +20,8 @@
case CHAIN:
r = "CHAIN";
break;
- case ADJMAP:
- r = "ADJMAP";
+ case NEIGHBER:
+ r = "NEIGHBER";
break;
case MESSAGE:
r = "MESSAGE";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 50ff400..f2a61be 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,19 +1,16 @@
package edu.uci.ics.genomix.pregelix.util;
-import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class VertexUtil {
- public static VKmerBytesWritable subKmer = new VKmerBytesWritable(0);
-
/**
* Single Vertex: in-degree = out-degree = 1
*
* @param vertexValue
*/
- public static boolean isPathVertex(byte value) {
- if (GeneCode.inDegree(value) == 1 && GeneCode.outDegree(value) == 1)
+ public static boolean isPathVertex(ValueStateWritable value) {
+ if (value.inDegree() == 1 && value.outDegree() == 1)
return true;
return false;
}
@@ -23,8 +20,8 @@
*
* @param vertexValue
*/
- public static boolean isHeadVertex(byte value) {
- if (GeneCode.outDegree(value) > 0 && !isPathVertex(value))
+ public static boolean isHeadVertex(ValueStateWritable value) {
+ if (value.outDegree() > 0 && !isPathVertex(value))
return true;
return false;
}
@@ -34,35 +31,18 @@
*
* @param vertexValue
*/
- public static boolean isRearVertex(byte value) {
- if (GeneCode.inDegree(value) > 0 && !isPathVertex(value))
+ public static boolean isRearVertex(ValueStateWritable value) {
+ if (value.inDegree() > 0 && !isPathVertex(value))
return true;
return false;
}
/**
- * update right neighber based on next vertexId
- */
- public static byte updateRightNeighberByVertexId(byte oldVertexValue, KmerBytesWritable neighberVertex, int k) {
- byte geneCode = neighberVertex.getGeneCodeAtPosition(k - 1);
-
- byte newBit = GeneCode.getBitMapFromGeneCode(geneCode); //getAdjBit
- return (byte) ((byte) (oldVertexValue & 0xF0) | (byte) (newBit & 0x0F));
- }
-
- /**
- * update right neighber
- */
- public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue) {
- return (byte) ((byte) (oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
- }
-
- /**
* check if mergeChain is cycle
*/
- public static boolean isCycle(KmerBytesWritable vertexId, VKmerBytesWritable mergeChain, int kmerSize) {
+ public static boolean isCycle(KmerBytesWritable kmer, KmerBytesWritable mergeChain, int kmerSize) {
String chain = mergeChain.toString().substring(1);
- if (chain.contains(vertexId.toString()))
+ if (chain.contains(kmer.toString()))
return true;
return false;
@@ -75,11 +55,4 @@
}
return false;*/
}
-
- /**
- * reverse neighber
- */
- public static byte reverseAdjMap(byte oldAdjMap, byte geneCode) {
- return (byte) ((oldAdjMap & 0xF0) | (GeneCode.getBitMapFromGeneCode(geneCode) & 0x0F));
- }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 809ea34..f1fcdac 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -4,15 +4,15 @@
import java.io.FileOutputStream;
import java.io.IOException;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
public class JobGenerator {
@@ -25,7 +25,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -42,7 +42,7 @@
job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -58,7 +58,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.4f);
@@ -72,8 +72,8 @@
}
public static void main(String[] args) throws IOException {
- //genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
+ genNaiveAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
genP3ForMergeGraph();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
deleted file mode 100644
index 66ee26d..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-
-@SuppressWarnings("deprecation")
-public class GraphBuildTest {
- private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-
- private static final String DATA_PATH = "data/testGeneFile";
- private static final String HDFS_INPUT_PATH = "/test";
- private static final String HDFS_OUTPUT_PATH = "/result";
-
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/result.txt";
- private static final String CONVERT_RESULT = ACTUAL_RESULT_DIR + "/graph_build_result.txt";
- private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
-
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private Driver driver;
-
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
-
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-
- conf.setInt(GenomixJob.KMER_LENGTH, 55);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
-
- @Test
- public void TestAll() throws Exception {
- cleanUpReEntry();
- TestPreClusterGroupby();
- }
-
- public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- //conf.set(GenomixJob.OUTPUT_FORMAT, "text");
- System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- private boolean checkResults(String expectedPath) throws Exception {
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
- if ("text".equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- } else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH + partname),
- FileSystem.getLocal(new Configuration()), new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH
- + partname), false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- }
-
- // TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
-
- @After
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java
deleted file mode 100644
index daa7e39..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-
-public class GraphBuildTestCase extends TestCase {
- private final JobConf conf;
- private Driver driver;
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
- private static final String HDFS_OUTPUT_PATH = "/result";
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/result.txt";
- private static final String CONVERT_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/result.txt.txt";
-
- public GraphBuildTestCase(JobConf conf, Driver driver) {
- this.conf = conf;
- this.driver = driver;
- }
-
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
-
- @Test
- public void Test() throws Exception {
- cleanUpReEntry();
- TestPreClusterGroupby();
- }
-
- public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults());
- }
-
- private boolean checkResults() throws Exception {
- File dumped = null;
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
- if ("text".equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH + partname),
- FileSystem.getLocal(new Configuration()), new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH
- + partname), false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
-
- // TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestSuite.java
deleted file mode 100644
index fdc3785..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestSuite.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import junit.framework.Assert;
-import junit.framework.Test;
-import junit.framework.TestResult;
-import junit.framework.TestSuite;
-
-public class GraphBuildTestSuite extends TestSuite {
- private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-
- private static final String DATA_PATH = "graph/7/TreePath";
- private static final String HDFS_INPUT_PATH = "/test";
- private static final String HDFS_OUTPUT_PATH = "/result";
-
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private static JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private static Driver driver;
-
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
-
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-
- conf.setInt(GenomixJob.KMER_LENGTH, 7);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- public static Test suite() throws Exception {
- GraphBuildTestSuite testSuite = new GraphBuildTestSuite();
- testSuite.setUp();
- testSuite.addTest(new GraphBuildTestCase(conf, driver));
- return testSuite;
- }
-
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
index 68c186a..8c02547 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
@@ -14,7 +14,6 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
public class MergePathTest {
@@ -98,10 +97,10 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() <= maxLength && value.getLengthOfMergeChain() != kmerSize) {
+ /*if (value.getLengthOfMergeChain() <= maxLength && value.getLengthOfMergeChain() != kmerSize) {
bw.write(value.getLengthOfMergeChain() + "\t" + value.getMergeChain().toString());
bw.newLine();
- }
+ }*/
}
reader.close();
}
@@ -123,10 +122,10 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() <= maxLength && value.getState() == State.FINAL_VERTEX) {
+ /* if (value.getLengthOfMergeChain() <= maxLength && value.getState() == State.FINAL_VERTEX) {
bw.write(value.getLengthOfMergeChain() + "\t" + value.getMergeChain().toString());
bw.newLine();
- }
+ }*/
}
reader.close();
}