finish p4
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
index d3d52f3..65458c0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
@@ -13,7 +13,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.PositionWritable;
public class BinaryDataCleanVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
@@ -38,7 +38,7 @@
public static abstract class BinaryDataCleanVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<PositionWritable, ValueStateWritable> lineRecordReader;
+ private final RecordReader<PositionWritable, VertexValueWritable> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -48,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryDataCleanVertexReader(RecordReader<PositionWritable, ValueStateWritable> recordReader) {
+ public BinaryDataCleanVertexReader(RecordReader<PositionWritable, VertexValueWritable> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -74,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<PositionWritable, ValueStateWritable> getRecordReader() {
+ protected RecordReader<PositionWritable, VertexValueWritable> getRecordReader() {
return lineRecordReader;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
index efe41d6..65c7d94 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexOutputFormat.java
@@ -10,7 +10,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter;
+ private final RecordWriter<PositionWritable, VertexValueWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<PositionWritable, VertexValueWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<PositionWritable, ValueStateWritable> getRecordWriter() {
+ public RecordWriter<PositionWritable, VertexValueWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
index 140a703..c4c1b64 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
@@ -12,18 +12,18 @@
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat.BinaryDataCleanVertexReader;
public class DataCleanInputFormat extends
- BinaryDataCleanVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ BinaryDataCleanVertexInputFormat<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+ public VertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryDataCleanLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -31,12 +31,12 @@
@SuppressWarnings("rawtypes")
class BinaryDataCleanLoadGraphReader extends
- BinaryDataCleanVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ BinaryDataCleanVertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
private Vertex vertex;
private PositionWritable vertexId = new PositionWritable();
- private ValueStateWritable vertexValue = new ValueStateWritable();
+ private VertexValueWritable vertexValue = new VertexValueWritable();
- public BinaryDataCleanLoadGraphReader(RecordReader<PositionWritable, ValueStateWritable> recordReader) {
+ public BinaryDataCleanLoadGraphReader(RecordReader<PositionWritable, VertexValueWritable> recordReader) {
super(recordReader);
}
@@ -47,7 +47,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
+ public Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index b06dbbe..4c38255 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -7,20 +7,20 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
public class DataCleanOutputFormat extends
- BinaryDataCleanVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
+ BinaryDataCleanVertexOutputFormat<PositionWritable, VertexValueWritable, NullWritable> {
@Override
- public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ public VertexWriter<PositionWritable, VertexValueWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
- RecordWriter<PositionWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<PositionWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -28,13 +28,13 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
+ BinaryVertexWriter<PositionWritable, VertexValueWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, VertexValueWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
if(vertex.getVertexValue().getState() != State.END_VERTEX)
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 0b0055b..cf254d1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -9,7 +9,7 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -18,24 +18,24 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
public class LogAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ BinaryVertexInputFormat<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+ public VertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ BinaryVertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
private Vertex vertex = null;
private NodeWritable node = new NodeWritable();
private PositionWritable vertexId = new PositionWritable();
- private ValueStateWritable vertexValue = new ValueStateWritable();
+ private VertexValueWritable vertexValue = new VertexValueWritable();
public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
super(recordReader);
@@ -48,7 +48,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
+ public Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index a26c075..95e24be 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -9,15 +9,15 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
public class LogAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexOutputFormat<PositionWritable, VertexValueWritable, NullWritable> {
@Override
- public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ public VertexWriter<PositionWritable, VertexValueWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
RecordWriter<NodeWritable, NullWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
@@ -28,7 +28,7 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexWriter<PositionWritable, VertexValueWritable, NullWritable> {
private NodeWritable node = new NodeWritable();
private NullWritable nul = NullWritable.get();
@@ -37,7 +37,7 @@
}
@Override
- public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index 91634d8..d10e625 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -13,19 +13,19 @@
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
public class NaiveAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ BinaryVertexInputFormat<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+ public VertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -33,11 +33,11 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ BinaryVertexReader<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
private Vertex vertex;
private NodeWritable node = new NodeWritable();
private PositionWritable vertexId = new PositionWritable();
- private ValueStateWritable vertexValue = new ValueStateWritable();
+ private VertexValueWritable vertexValue = new VertexValueWritable();
public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
super(recordReader);
@@ -50,7 +50,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
+ public Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index 7e157bb..f9f3749 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -7,17 +7,17 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
public class NaiveAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexOutputFormat<PositionWritable, VertexValueWritable, NullWritable> {
@Override
- public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ public VertexWriter<PositionWritable, VertexValueWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
RecordWriter<NodeWritable, NullWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
@@ -28,7 +28,7 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexWriter<PositionWritable, VertexValueWritable, NullWritable> {
private NodeWritable node = new NodeWritable();
private NullWritable nullWritable = NullWritable.get();
@@ -37,7 +37,7 @@
}
@Override
- public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 6a71344..935da50 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -6,6 +6,7 @@
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -21,6 +22,7 @@
private KmerBytesWritable chainVertexId;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private byte message;
+ private byte adjMessage;
private byte checkMessage;
@@ -29,6 +31,7 @@
chainVertexId = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable();
message = Message.NON;
+ adjMessage = AdjMessage.NON;
checkMessage = (byte) 0;
}
@@ -46,6 +49,8 @@
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(msg.getNeighberNode());
}
+ checkMessage |= CheckMessage.ADJMSG;
+ this.adjMessage = msg.getAdjMessage();
this.message = msg.getMessage();
}
@@ -109,6 +114,15 @@
public int getLengthOfChain() {
return chainVertexId.getKmerLength();
}
+
+ public byte getAdjMessage() {
+ return adjMessage;
+ }
+
+ public void setAdjMessage(byte adjMessage) {
+ checkMessage |= CheckMessage.ADJMSG;
+ this.adjMessage = adjMessage;
+ }
public byte getMessage() {
return message;
@@ -127,6 +141,8 @@
chainVertexId.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
+ if ((checkMessage & CheckMessage.ADJMSG) != 0)
+ out.write(adjMessage);
out.write(message);
}
@@ -140,6 +156,8 @@
chainVertexId.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
+ if ((checkMessage & CheckMessage.ADJMSG) != 0)
+ adjMessage = in.readByte();
message = in.readByte();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
similarity index 86%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 7895da4..de483bc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -7,22 +7,25 @@
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
-public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
+public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
private AdjacencyListWritable incomingList;
private AdjacencyListWritable outgoingList;
private byte state;
private KmerBytesWritable mergeChain;
+ private PositionWritable mergeDest;
- public ValueStateWritable() {
+ public VertexValueWritable() {
incomingList = new AdjacencyListWritable();
outgoingList = new AdjacencyListWritable();
state = State.NON_VERTEX;
mergeChain = new KmerBytesWritable(0);
+ mergeDest = new PositionWritable();
}
- public ValueStateWritable(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
+ public VertexValueWritable(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
byte state, KmerBytesWritable mergeChain) {
set(forwardForwardList, forwardReverseList,
@@ -41,7 +44,7 @@
this.mergeChain.set(mergeChain);
}
- public void set(ValueStateWritable value) {
+ public void set(VertexValueWritable value) {
set(value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
value.getMergeChain());
}
@@ -113,6 +116,14 @@
public void setMergeChain(KmerBytesWritable mergeChain) {
this.mergeChain.set(mergeChain);
}
+
+ public PositionWritable getMergeDest() {
+ return mergeDest;
+ }
+
+ public void setMergeDest(PositionWritable mergeDest) {
+ this.mergeDest = mergeDest;
+ }
@Override
public void readFields(DataInput in) throws IOException {
@@ -120,6 +131,7 @@
outgoingList.readFields(in);
state = in.readByte();
mergeChain.readFields(in);
+ mergeDest.readFields(in);
}
@Override
@@ -128,10 +140,11 @@
outgoingList.write(out);
out.writeByte(state);
mergeChain.write(out);
+ mergeDest.write(out);
}
@Override
- public int compareTo(ValueStateWritable o) {
+ public int compareTo(VertexValueWritable o) {
return 0;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 6c24aa9..8525a0a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -13,7 +13,7 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
/*
* vertexId: BytesWritable
@@ -47,7 +47,7 @@
* Naive Algorithm for path merge graph
*/
public class BridgeAddVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
@@ -77,7 +77,7 @@
vertex.getMsgList().clear();
vertex.getEdges().clear();
PositionWritable vertexId = new PositionWritable();
- ValueStateWritable vertexValue = new ValueStateWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable();
/**
* set the src vertex id
*/
@@ -116,7 +116,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index bc97dbd..2770ab8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -11,7 +11,7 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -47,7 +47,7 @@
* Naive Algorithm for path merge graph
*/
public class BridgeRemoveVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
@@ -76,7 +76,7 @@
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
outgoingMsg.setMessage(AdjMessage.FROMFF);
@@ -96,7 +96,7 @@
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
outgoingMsg.setMessage(AdjMessage.FROMRF);
@@ -269,7 +269,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index b02ea06..63ac5b5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -13,7 +13,7 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
/*
* vertexId: BytesWritable
@@ -47,7 +47,7 @@
* Remove tip or single node when l > constant
*/
public class BubbleAddVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BubbleAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -73,7 +73,7 @@
vertex.getMsgList().clear();
vertex.getEdges().clear();
PositionWritable vertexId = new PositionWritable();
- ValueStateWritable vertexValue = new ValueStateWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable();
/**
* set the src vertex id
*/
@@ -112,7 +112,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index f64c36d..396d43d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -15,7 +15,7 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -51,7 +51,7 @@
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MergeBubbleMessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
public static final String ITERATIONS = "BubbleMergeVertex.iteration";
public static int kmerSize = -1;
@@ -81,7 +81,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -92,7 +92,7 @@
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -299,7 +299,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 87016b7..cff6cd7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -10,7 +10,7 @@
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -19,7 +19,7 @@
import edu.uci.ics.genomix.type.PositionWritable;
/*
* vertexId: BytesWritable
- * vertexValue: ValueStateWritable
+ * vertexValue: VertexValueWritable
* edgeValue: NullWritable
* message: MessageWritable
*
@@ -46,7 +46,7 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LogAlgorithmForPathMergeVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "LogAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
@@ -74,7 +74,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -82,7 +82,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -93,7 +93,7 @@
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -109,7 +109,7 @@
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -283,7 +283,7 @@
job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index f27c140..47ca06e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -13,7 +13,7 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -50,7 +50,7 @@
* Naive Algorithm for path merge graph
*/
public class NaiveAlgorithmForPathMergeVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
@@ -79,7 +79,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -87,7 +87,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -98,7 +98,7 @@
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -114,7 +114,7 @@
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -260,7 +260,7 @@
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index 35e679a..949e8bb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -13,7 +13,7 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -50,7 +50,7 @@
* Naive Algorithm for path merge graph
*/
public class P3ForPathMergeVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
@@ -86,7 +86,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -94,7 +94,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -105,7 +105,7 @@
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -121,7 +121,7 @@
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -411,7 +411,7 @@
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 31e0e24..b6c03fb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -12,9 +12,9 @@
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
@@ -52,7 +52,7 @@
* Naive Algorithm for path merge graph
*/
public class P4ForPathMergeVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P4ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P4ForPathMergeVertex.iteration";
public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
@@ -113,7 +113,7 @@
/**
* set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
*/
- protected boolean setNextInfo(ValueStateWritable value) {
+ protected boolean setNextInfo(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0) {
nextID.set(value.getFFList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
@@ -130,7 +130,7 @@
/**
* set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
*/
- protected boolean setPrevInfo(ValueStateWritable value) {
+ protected boolean setPrevInfo(VertexValueWritable value) {
if (value.getRRList().getCountOfPosition() > 0) {
prevID.set(value.getRRList().getPosition(0));
prevHead = isNodeRandomHead(prevID);
@@ -147,7 +147,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -155,7 +155,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -166,7 +166,7 @@
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -182,7 +182,7 @@
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
destVertexId.set(posIterator.next());
@@ -271,46 +271,105 @@
else
return false;
}
+
/**
- * send update message from predecessor
+ * set adjMessage to successor(from predecessor)
*/
- public void sendUpMsgFromPredecessor(){
+ public void setSuccessorAdjMsg(){
+ if(getVertexValue().getFFList().getLength() > 0)
+ outgoingMsg.setAdjMessage(AdjMessage.FROMFF);
+ else
+ outgoingMsg.setAdjMessage(AdjMessage.FROMFR);
+ }
+
+ /**
+ * set adjMessage to predecessor(from successor)
+ */
+ public void setPredecessorAdjMsg(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ outgoingMsg.setAdjMessage(AdjMessage.FROMRF);
+ else
+ outgoingMsg.setAdjMessage(AdjMessage.FROMRR);
+ }
+
+ /**
+ * send update message to neighber
+ */
+ public void broadcastUpdateMsg(){
outFlag |= MessageFlag.FROM_PREDECESSOR;
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
if(ifFlipWithPredecessor())
outFlag |= MessageFlag.FLIP;
outgoingMsg.setMessage(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
+ setSuccessorAdjMsg();
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- outUpFlag = (byte)(MessageFlag.FROM_DEADVERTEX | MessageFlag.FROM_SUCCESSOR);
+ outUpFlag = (byte)(MessageFlag.FROM_SUCCESSOR);
outgoingUpMsg.setNeighberNode(getVertexValue().getOutgoingList());
if(ifFilpWithSuccessor())
outFlag |= MessageFlag.FLIP;
outgoingUpMsg.setMessage(outUpFlag);
outgoingUpMsg.setSourceVertexId(getVertexId());
+ setPredecessorAdjMsg();
sendMsg(getPreDestVertexId(getVertexValue()), outgoingUpMsg);
- deleteVertex(getVertexId());
+ //remove its own neighbers
+ getVertexValue().setIncomingList(null);
+ getVertexValue().setOutgoingList(null);
}
/**
- * send update message from successor
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ */
+ public void sendUpMsgFromPredecessor(){
+ getVertexValue().setState(MessageFlag.SHOULD_MERGEWITHNEXT);
+ if(getVertexValue().getFFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
*/
public void sendUpMsgFromSuccessor(){
- outFlag |= MessageFlag.FROM_SUCCESSOR;
- outgoingUpMsg.setNeighberNode(getVertexValue().getOutgoingList());
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
- outUpFlag = (byte)(MessageFlag.FROM_DEADVERTEX | MessageFlag.FROM_PREDECESSOR);
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingUpMsg.setMessage(outUpFlag);
- outgoingUpMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingUpMsg);
- deleteVertex(getVertexId());
+ getVertexValue().setState(MessageFlag.SHOULD_MERGEWITHPREV);
+ if(getVertexValue().getRFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * updateAdjList
+ */
+ public void updateAdjList(){
+ if(incomingMsg.getAdjMessage() == AdjMessage.FROMFF){
+ getVertexValue().setRRList(null); //may replace setNull with remove
+ if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
+ getVertexValue().setRFList(incomingMsg.getNeighberNode().getForwardList());
+ else
+ getVertexValue().setRFList(incomingMsg.getNeighberNode().getReverseList());
+ } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMFR){
+ getVertexValue().setFRList(null); //may replace setNull with remove
+ if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
+ getVertexValue().setFFList(incomingMsg.getNeighberNode().getForwardList());
+ else
+ getVertexValue().setFFList(incomingMsg.getNeighberNode().getReverseList());
+ } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMRF){
+ getVertexValue().setRFList(null); //may replace setNull with remove
+ if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
+ getVertexValue().setRRList(incomingMsg.getNeighberNode().getForwardList());
+ else
+ getVertexValue().setRRList(incomingMsg.getNeighberNode().getReverseList());
+ } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMRR){
+ getVertexValue().setFFList(null); //may replace setNull with remove
+ if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
+ getVertexValue().setFRList(incomingMsg.getNeighberNode().getForwardList());
+ else
+ getVertexValue().setFRList(incomingMsg.getNeighberNode().getReverseList());
+ }
}
/**
@@ -318,19 +377,7 @@
*/
public void updateAdjList_MsgPredecessor(){
if((outFlag & MessageFlag.FLIP) > 0){
- if(getVertexValue().getFFList().getLength() > 0){
- getVertexValue().setFFList(null);
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setFRList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setFRList(incomingMsg.getNeighberNode().getReverseList());
- } else {
- getVertexValue().setFRList(null);
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setFFList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setFFList(incomingMsg.getNeighberNode().getReverseList());
- }
+ updateAdjList();
} else {
getVertexValue().setIncomingList(incomingMsg.getNeighberNode());
}
@@ -341,19 +388,7 @@
*/
public void updateAdjList_MsgSuccessor(){
if((outFlag & MessageFlag.FLIP) > 0){
- if(getVertexValue().getRRList().getLength() > 0){
- getVertexValue().setRRList(null);
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setRFList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setRFList(incomingMsg.getNeighberNode().getReverseList());
- } else {
- getVertexValue().setRFList(null);
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setRRList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setRRList(incomingMsg.getNeighberNode().getReverseList());
- }
+ updateAdjList();
} else {
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -366,7 +401,7 @@
startSendMsg();
else if (getSuperstep() == 2)
initState(msgIterator);
- else if (getSuperstep() % 2 == 1){
+ else if (getSuperstep() % 4 == 3){
// Node may be marked as head b/c it's a real head or a real tail
headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
@@ -382,7 +417,6 @@
hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
- outFlag |= MessageFlag.FROM_SELF;
getVertexValue().setState(outFlag);
voteToHalt();
}
@@ -419,30 +453,47 @@
}
}
}
- else if (getSuperstep() % 2 == 0){
+ else if (getSuperstep() % 4 == 0){
+ //update neighber
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
outFlag = incomingMsg.getMessage();
- if((outFlag & MessageFlag.FROM_DEADVERTEX) > 0){
- if((outFlag & MessageFlag.FROM_PREDECESSOR) > 0){
- updateAdjList_MsgPredecessor();
- }
- else {//Message from successor.
- updateAdjList_MsgSuccessor();
- }
+ if((outFlag & MessageFlag.FROM_PREDECESSOR) > 0){
+ updateAdjList_MsgPredecessor();
+ }
+ else {//Message from successor.
+ updateAdjList_MsgSuccessor();
}
- else {//Not for update, but for merging
- if((outFlag & MessageFlag.FROM_PREDECESSOR) > 0){
- //B merge with A's reverse
- //append A's reverse to B
- //
- updateAdjList_MsgPredecessor();
- }
- else {//Message from successor
- //B merge with A's reverse
- //append
- updateAdjList_MsgSuccessor();
- }
+ }
+ } else if (getSuperstep() % 4 == 1){
+ //send message to the merge object and kill self
+ if((getVertexValue().getState() | MessageFlag.SHOULD_MERGEWITHNEXT) > 0){
+ setSuccessorAdjMsg();
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ sendMsg(getVertexValue().getMergeDest(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if((getVertexValue().getState() | MessageFlag.SHOULD_MERGEWITHPREV) > 0){
+ setPredecessorAdjMsg();
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ sendMsg(getVertexValue().getMergeDest(), outgoingMsg);
+ deleteVertex(getVertexId());
+ }
+ } else if (getSuperstep() % 4 == 2){
+ //merge kmer
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ outFlag = incomingMsg.getMessage();
+ if(outFlag == AdjMessage.FROMFF){
+ //mergeWithRR(incomingMsg.getChain())
+ }
+ else if(outFlag == AdjMessage.FROMFR){
+ //mergeWithRF(incomingMsg.getChain())
+ }
+ else if(outFlag == AdjMessage.FROMRF){
+ //mergeWithFR(incomingMsg.getChain())
+ }
+ else if(outFlag == AdjMessage.FROMRR){
+ //mergeWithFF(incomingMsg.getChain())
}
}
}
@@ -458,7 +509,7 @@
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 8c9fdc5..59a5efb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -13,7 +13,7 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
/*
* vertexId: BytesWritable
@@ -47,7 +47,7 @@
* Remove tip or single node when l > constant
*/
public class TipAddVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "TipAddVertex.kmerSize";
public static int kmerSize = -1;
@@ -73,7 +73,7 @@
vertex.getMsgList().clear();
vertex.getEdges().clear();
PositionWritable vertexId = new PositionWritable();
- ValueStateWritable vertexValue = new ValueStateWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable();
/**
* set the src vertex id
*/
@@ -107,7 +107,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 4901269..61e7ae9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -10,7 +10,7 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -46,7 +46,7 @@
* Remove tip or single node when l > constant
*/
public class TipRemoveVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "TipRemoveVertex.kmerSize";
public static final String LENGTH = "TipRemoveVertex.length";
public static int kmerSize = -1;
@@ -74,7 +74,7 @@
/**
* get destination vertex
*/
- public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
posIterator = value.getFFList().iterator();
else // #FRList() > 0
@@ -82,7 +82,7 @@
return posIterator.next();
}
- public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
posIterator = value.getRFList().iterator();
else // #RRList() > 0
@@ -182,7 +182,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
index 452f72d..8669ecd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -11,7 +11,7 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -25,11 +25,11 @@
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, PositionWritable.class,
- ValueStateWritable.class, CompressionType.NONE);
+ VertexValueWritable.class, CompressionType.NONE);
NodeWritable node = new NodeWritable();
NullWritable value = NullWritable.get();
PositionWritable outputKey = new PositionWritable();
- ValueStateWritable outputValue = new ValueStateWritable();
+ VertexValueWritable outputValue = new VertexValueWritable();
while(reader.next(node, value)) {
System.out.println(node.getNodeID().toString());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 1dd8757..9dd5162 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -12,7 +12,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -31,7 +31,7 @@
//NodeWritable key = new NodeWritable(kmerSize);
//NullWritable value = NullWritable.get();
PositionWritable key = new PositionWritable();
- ValueStateWritable value = new ValueStateWritable();
+ VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
if (key == null) {
@@ -54,7 +54,7 @@
Path path = new Path("/home/anbangx/genomix_result/final_naive/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
KmerBytesWritable key = new KmerBytesWritable(55);
- ValueStateWritable value = new ValueStateWritable();
+ VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
if (key == null || value == null) {
@@ -78,7 +78,7 @@
Path path = new Path("/home/anbangx/genomix_result/improvelog2/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
KmerBytesWritable key = new KmerBytesWritable(55);
- ValueStateWritable value = new ValueStateWritable();
+ VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
if (key == null || value == null) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index c7bcf48..4aee8a0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -7,7 +7,7 @@
public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
public static final byte STATE = 1 << 4;
- public static final byte LASTGENECODE = 1 << 5;
+ public static final byte ADJMSG = 1 << 5;
public static final byte START = 1 << 6;
public final static class CheckMessage_CONTENT {
@@ -30,8 +30,8 @@
case STATE:
r = "STATE";
break;
- case LASTGENECODE:
- r = "LASTGENECODE";
+ case ADJMSG:
+ r = "ADJMSG";
break;
case START:
r = "START";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index 9ef6842..0a5428b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -1,26 +1,25 @@
package edu.uci.ics.genomix.pregelix.type;
public class MessageFlag {
- public static final byte FROM_SELF = 0;
+ public static final byte FLIP = 0;
public static final byte IS_HEAD = 1 << 1;
public static final byte IS_TAIL = 1 << 2;
- public static final byte FROM_DEADVERTEX = 1 << 3;
- //public static final byte FROM_FORWARDLIST = 1 << 4;
- public static final byte FLIP = 1 << 4;
+ public static final byte SHOULD_MERGEWITHNEXT = 1 << 3;
+ public static final byte SHOULD_MERGEWITHPREV = 1 << 4;
public static final byte FROM_SUCCESSOR = 1 << 5;
public static final byte FROM_PREDECESSOR = 1 << 6;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
switch (code) {
- case FROM_SELF:
- return "FROM_SELF";
case IS_HEAD:
return "IS_HEAD";
case IS_TAIL:
return "IS_TAIL";
- case FROM_DEADVERTEX:
- return "FROM_DEADVERTEX";
+ case SHOULD_MERGEWITHNEXT:
+ return "SHOULD_MERGEWITHNEXT";
+ case SHOULD_MERGEWITHPREV:
+ return "SHOULD_MERGEWITHPREV";
case FLIP:
return "FLIP";
case FROM_SUCCESSOR:
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 7962c67..67c9792 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,6 +1,6 @@
package edu.uci.ics.genomix.pregelix.util;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
public class VertexUtil {
@@ -9,7 +9,7 @@
*
* @param vertexValue
*/
- public static boolean isPathVertex(ValueStateWritable value) {
+ public static boolean isPathVertex(VertexValueWritable value) {
return value.inDegree() == 1 && value.outDegree() == 1;
}
@@ -18,7 +18,7 @@
*
* @param vertexValue
*/
- public static boolean isHeadVertex(ValueStateWritable value) {
+ public static boolean isHeadVertex(VertexValueWritable value) {
return value.outDegree() > 0 && !isPathVertex(value) && !isHeadWithoutIndegree(value);
}
@@ -27,21 +27,21 @@
*
* @param vertexValue
*/
- public static boolean isRearVertex(ValueStateWritable value) {
+ public static boolean isRearVertex(VertexValueWritable value) {
return value.inDegree() > 0 && !isPathVertex(value) && !isRearWithoutOutdegree(value);
}
/**
* Head Vertex without indegree: indegree = 0, outdegree = 1
*/
- public static boolean isHeadWithoutIndegree(ValueStateWritable value){
+ public static boolean isHeadWithoutIndegree(VertexValueWritable value){
return value.inDegree() == 0 && value.outDegree() == 1;
}
/**
* Rear Vertex without outdegree: indegree = 1, outdegree = 0
*/
- public static boolean isRearWithoutOutdegree(ValueStateWritable value){
+ public static boolean isRearWithoutOutdegree(VertexValueWritable value){
return value.inDegree() == 1 && value.outDegree() == 0;
}
@@ -65,32 +65,32 @@
/**
* check if vertex is a tip
*/
- public static boolean isIncomingTipVertex(ValueStateWritable value){
+ public static boolean isIncomingTipVertex(VertexValueWritable value){
return value.inDegree() == 0 && value.outDegree() == 1;
}
- public static boolean isOutgoingTipVertex(ValueStateWritable value){
+ public static boolean isOutgoingTipVertex(VertexValueWritable value){
return value.inDegree() == 1 && value.outDegree() == 0;
}
/**
* check if vertex is single
*/
- public static boolean isSingleVertex(ValueStateWritable value){
+ public static boolean isSingleVertex(VertexValueWritable value){
return value.inDegree() == 0 && value.outDegree() == 0;
}
/**
* check if vertex is upbridge
*/
- public static boolean isUpBridgeVertex(ValueStateWritable value){
+ public static boolean isUpBridgeVertex(VertexValueWritable value){
return value.inDegree() == 1 && value.outDegree() > 1;
}
/**
* check if vertex is downbridge
*/
- public static boolean isDownBridgeVertex(ValueStateWritable value){
+ public static boolean isDownBridgeVertex(VertexValueWritable value){
return value.inDegree() > 1 && value.outDegree() == 1;
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 33955c9..f52957e 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -10,7 +10,7 @@
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
@@ -34,7 +34,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -51,7 +51,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -67,7 +67,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 3);
job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.3f);
job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, 2);
@@ -86,7 +86,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(TipAddVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -103,7 +103,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -120,7 +120,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -137,7 +137,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -154,7 +154,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -171,7 +171,7 @@
job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
job.getConfiguration().setInt(BubbleMergeVertex.KMER_SIZE, 3);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
index 8c02547..499abdc 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
@@ -12,7 +12,7 @@
import org.apache.hadoop.io.SequenceFile;
import org.junit.Test;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -51,7 +51,7 @@
Path path = new Path(input + "/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
KmerBytesWritable key = new KmerBytesWritable(kmerSize);
- ValueStateWritable value = new ValueStateWritable();
+ VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
if (key == null || value == null) {
@@ -91,7 +91,7 @@
Path path = new Path(input + "/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
KmerBytesWritable key = new KmerBytesWritable(kmerSize);
- ValueStateWritable value = new ValueStateWritable();
+ VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
if (key == null || value == null) {
@@ -116,7 +116,7 @@
Path path = new Path(input + "/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
KmerBytesWritable key = new KmerBytesWritable(kmerSize);
- ValueStateWritable value = new ValueStateWritable();
+ VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
if (key == null || value == null) {