Merge branch 'fullstack_genomix' of https://code.google.com/p/hyracks into fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index e901de6..fd4c252 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -123,7 +123,7 @@
byte l = 0;
int bytecount = 0;
int bcount = this.size - 1;
- for (int i = start; i < start + kmerlength; i++) {
+ for (int i = start; i < start + kmerlength && i < array.length; i++) {
byte code = GeneCode.getCodeFromSymbol(array[i]);
l |= (byte) (code << bytecount);
bytecount += 2;
@@ -151,7 +151,7 @@
byte l = 0;
int bytecount = 0;
int bcount = size - 1;
- for (int i = start + kmerlength - 1; i >= 0; i--) {
+ for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
byte code = GeneCode.getCodeFromSymbol(array[i]);
l |= (byte) (code << bytecount);
bytecount += 2;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index b958ec6..abedad6 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -38,18 +38,18 @@
super(other);
}
- public void setSize(int size) {
+ protected void setSize(int size) {
if (size > getCapacity()) {
setCapacity((size * 3 / 2));
}
this.size = size;
}
- public int getCapacity() {
+ protected int getCapacity() {
return bytes.length;
}
- public void setCapacity(int new_cap) {
+ protected void setCapacity(int new_cap) {
if (new_cap != getCapacity()) {
byte[] new_data = new byte[new_cap];
if (new_cap < size) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 4f5b4da..409b434 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -1,157 +1,160 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-;
-
-public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
- private static final long serialVersionUID = 1L;
-
- private KmerBytesWritable kmer;
- private boolean bReversed;
-
- public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
- bReversed = bGenerateReversed;
- kmer = new KmerBytesWritable(k);
- }
-
- @Override
- public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- return new IKeyValueParser<LongWritable, Text>() {
-
- @Override
- public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
- String geneLine = value.toString(); // Read the Real Gene Line
- Pattern genePattern = Pattern.compile("[AGCT]+");
- Matcher geneMatcher = genePattern.matcher(geneLine);
- boolean isValid = geneMatcher.matches();
- if (isValid) {
- SplitReads(geneLine.getBytes(), writer);
- }
- }
-
- private void SplitReads(byte[] array, IFrameWriter writer) {
- /** first kmer */
- int k = kmer.getKmerLength();
- kmer.setByRead(array, 0);
- byte pre = 0;
- byte next = GeneCode.getAdjBit(array[k]);
- InsertToFrame(kmer, pre, next, writer);
-
- /** middle kmer */
- for (int i = k; i < array.length - 1; i++) {
- pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
- next = GeneCode.getAdjBit(array[i + 1]);
- InsertToFrame(kmer, pre, next, writer);
- }
-
- /** last kmer */
- pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
- next = 0;
- InsertToFrame(kmer, pre, next, writer);
-
- if (bReversed) {
- /** first kmer */
- kmer.setByReadReverse(array, 0);
- next = 0;
- pre = GeneCode.getAdjBit(array[k]);
- InsertToFrame(kmer, pre, next, writer);
- /** middle kmer */
- for (int i = k; i < array.length - 1; i++) {
- next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
- pre = GeneCode.getAdjBit(array[i + 1]);
- InsertToFrame(kmer, pre, next, writer);
- }
- /** last kmer */
- next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
- pre = 0;
- InsertToFrame(kmer, pre, next, writer);
- }
- }
-
- /**
- * At this graph building phase, we assume the kmer length are all
- * the same Thus we didn't output those Kmer length
- *
- * @param kmer
- * :input kmer
- * @param pre
- * : pre neighbor code
- * @param next
- * : next neighbor code
- * @param writer
- * : output writer
- */
- private void InsertToFrame(KmerBytesWritable kmer, byte pre, byte next, IFrameWriter writer) {
- try {
- byte adj = GeneCode.mergePreNextAdj(pre, next);
- tupleBuilder.reset();
- tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, adj);
-
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- };
- }
-
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+;
+
+public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+ private static final long serialVersionUID = 1L;
+
+ private KmerBytesWritable kmer;
+ private boolean bReversed;
+
+ public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
+ bReversed = bGenerateReversed;
+ kmer = new KmerBytesWritable(k);
+ }
+
+ @Override
+ public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+
+ return new IKeyValueParser<LongWritable, Text>() {
+
+ @Override
+ public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+ String geneLine = value.toString(); // Read the Real Gene Line
+ Pattern genePattern = Pattern.compile("[AGCT]+");
+ Matcher geneMatcher = genePattern.matcher(geneLine);
+ boolean isValid = geneMatcher.matches();
+ if (isValid) {
+ SplitReads(geneLine.getBytes(), writer);
+ }
+ }
+
+ private void SplitReads(byte[] array, IFrameWriter writer) {
+ /** first kmer */
+ int k = kmer.getKmerLength();
+ if (k >= array.length){
+ return;
+ }
+ kmer.setByRead(array, 0);
+ byte pre = 0;
+ byte next = GeneCode.getAdjBit(array[k]);
+ InsertToFrame(kmer, pre, next, writer);
+
+ /** middle kmer */
+ for (int i = k; i < array.length - 1; i++) {
+ pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
+ next = GeneCode.getAdjBit(array[i + 1]);
+ InsertToFrame(kmer, pre, next, writer);
+ }
+
+ /** last kmer */
+ pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
+ next = 0;
+ InsertToFrame(kmer, pre, next, writer);
+
+ if (bReversed) {
+ /** first kmer */
+ kmer.setByReadReverse(array, 0);
+ next = 0;
+ pre = GeneCode.getAdjBit(array[k]);
+ InsertToFrame(kmer, pre, next, writer);
+ /** middle kmer */
+ for (int i = k; i < array.length - 1; i++) {
+ next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
+ pre = GeneCode.getAdjBit(array[i + 1]);
+ InsertToFrame(kmer, pre, next, writer);
+ }
+ /** last kmer */
+ next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
+ pre = 0;
+ InsertToFrame(kmer, pre, next, writer);
+ }
+ }
+
+ /**
+ * At this graph building phase, we assume the kmer length are all
+ * the same Thus we didn't output those Kmer length
+ *
+ * @param kmer
+ * :input kmer
+ * @param pre
+ * : pre neighbor code
+ * @param next
+ * : next neighbor code
+ * @param writer
+ * : output writer
+ */
+ private void InsertToFrame(KmerBytesWritable kmer, byte pre, byte next, IFrameWriter writer) {
+ try {
+ byte adj = GeneCode.mergePreNextAdj(pre, next);
+ tupleBuilder.reset();
+ tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, adj);
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ };
+ }
+
}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index f5a05a8..9874fc2 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -5,4 +5,4 @@
EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?
AATAGAAG
AATAGAAG
-AATAGAAG
+AATAGAAG
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 7898d35..7a7e43d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -32,7 +32,7 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
- private Vertex vertex;
+ private Vertex vertex = null;
private KmerBytesWritable vertexId = null;
private ValueStateWritable vertexValue = new ValueStateWritable();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index ac84d8e..108a2d5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -7,38 +7,49 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
-public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable>{
+public class LogAlgorithmMessageWritable implements
+ WritableComparable<LogAlgorithmMessageWritable> {
/**
* sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA
- * file stores the point to the file that stores the chains of connected DNA
+ * stores neighber vertexValue when pathVertex sends the message
+ * chainVertexId stores the chains of connected DNA file stores the point to
+ * the file that stores the chains of connected DNA
*/
private VKmerBytesWritable sourceVertexId;
private VKmerBytesWritable chainVertexId;
private byte adjMap;
private int message;
private int sourceVertexState;
-
- public LogAlgorithmMessageWritable(){
- sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
+
+ public LogAlgorithmMessageWritable() {
+ if (LogAlgorithmForPathMergeVertex.kmerSize > 0) {
+ sourceVertexId = new VKmerBytesWritable(
+ LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(
+ LogAlgorithmForPathMergeVertex.kmerSize);
+ } else {
+ sourceVertexId = new VKmerBytesWritable(55);
+ chainVertexId = new VKmerBytesWritable(55);
+ }
}
-
- public void set(VKmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, int message, int sourceVertexState){
+
+ public void set(KmerBytesWritable sourceVertexId,
+ KmerBytesWritable chainVertexId, byte adjMap, int message,
+ int sourceVertexState) {
this.sourceVertexId.set(sourceVertexId);
this.chainVertexId.set(chainVertexId);
this.adjMap = adjMap;
this.message = message;
this.sourceVertexState = sourceVertexState;
}
-
- public void reset(){
- //sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
- adjMap = (byte)0;
+
+ public void reset() {
+ // sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId.reset(0);
+ adjMap = (byte) 0;
message = 0;
sourceVertexState = 0;
}
@@ -47,7 +58,7 @@
return sourceVertexId;
}
- public void setSourceVertexId(VKmerBytesWritable sourceVertexId) {
+ public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
this.sourceVertexId.set(sourceVertexId);
}
@@ -86,7 +97,7 @@
public int getLengthOfChain() {
return chainVertexId.getKmerLength();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
sourceVertexId.write(out);
@@ -105,25 +116,25 @@
sourceVertexState = in.readInt();
}
- @Override
- public int hashCode() {
- return chainVertexId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof NaiveAlgorithmMessageWritable) {
- LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
- return chainVertexId.equals(tp.chainVertexId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return chainVertexId.toString();
- }
-
+ @Override
+ public int hashCode() {
+ return chainVertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof LogAlgorithmMessageWritable) {
+ LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
+ return chainVertexId.equals(tp.chainVertexId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return chainVertexId.toString();
+ }
+
@Override
public int compareTo(LogAlgorithmMessageWritable tp) {
return chainVertexId.compareTo(tp.chainVertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 769277c..a10014a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -19,7 +19,11 @@
public ValueStateWritable() {
state = State.NON_VERTEX;
- mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ if (NaiveAlgorithmForPathMergeVertex.kmerSize > 0){
+ mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ }else{
+ mergeChain = new VKmerBytesWritable(55);
+ }
}
public ValueStateWritable(byte adjMap, int state, VKmerBytesWritable mergeChain) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
index fa77d43..7b2f2ca 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -24,13 +24,13 @@
* vertexValue: ValueStateWritable
* edgeValue: NullWritable
* message: LogAlgorithmMessageWritable
- *
+ *
* DNA:
* A: 00
* C: 01
* G: 10
* T: 11
- *
+ *
* succeed node
* A 00000001 1
* G 00000010 2
@@ -42,74 +42,83 @@
* C 01000000 64
* T 10000000 128
*
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
* That means that vertexId is ACG, its succeed node is A and its precursor node is C.
* The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
-public class LogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
-
+public class LogAlgorithmForPathMergeVertex
+ extends
+ Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+
public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
private int maxIteration = -1;
-
- private ValueStateWritable vertexVal = new ValueStateWritable();
-
+
private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable vertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(
+ 1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+
/**
* initiate kmerSize, maxIteration
*/
- public void initVertex(){
- if(kmerSize == -1)
+ public void initVertex() {
+ if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- vertexId.set(getVertexId());
- vertexVal = getVertexValue();
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS,
+ 100);
}
/**
* get destination vertex
*/
- public VKmerBytesWritable getNextDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId,
+ byte geneCode) {
return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
}
-
- public VKmerBytesWritable getPreDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId,
+ byte geneCode) {
return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
}
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(VKmerBytesWritable chainVertexId, byte adjMap){
- return getDestVertexIdFromChain(chainVertexId, adjMap);//GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(
+ KmerBytesWritable chainVertexId, byte adjMap) {
+ return getDestVertexIdFromChain(chainVertexId, adjMap);// GeneCode.getGeneCodeFromBitMap((byte)(adjMap
+ // & 0x0F)
}
-
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+
+ public VKmerBytesWritable getDestVertexIdFromChain(
+ KmerBytesWritable chainVertexId, byte adjMap) {
lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ return getNextDestVertexId(lastKmer,
+ GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
}
+
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
+ for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+ if ((adjMap & (1 << x)) != 0) {
destVertexId.set(getNextDestVertexId(vertexId, x));
sendMsg(destVertexId, msg);
}
}
}
+
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(VKmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if(((adjMap >> 4) & (1 << x)) != 0){
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId,
+ byte adjMap) {
+ for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+ if (((adjMap >> 4) & (1 << x)) != 0) {
destVertexId.set(getPreDestVertexId(vertexId, x));
sendMsg(destVertexId, msg);
}
@@ -119,253 +128,282 @@
/**
* set vertex state
*/
- public void setState(){
- if(msg.getMessage() == Message.START &&
- (vertexVal.getState() == State.MID_VERTEX || vertexVal.getState() == State.END_VERTEX)){
- vertexVal.setState(State.START_VERTEX);
- setVertexValue(vertexVal);
- }
- else if(msg.getMessage() == Message.END && vertexVal.getState() == State.MID_VERTEX){
- vertexVal.setState(State.END_VERTEX);
- setVertexValue(vertexVal);
+ public void setState() {
+ if (msg.getMessage() == Message.START
+ && (getVertexValue().getState() == State.MID_VERTEX || getVertexValue()
+ .getState() == State.END_VERTEX)) {
+ getVertexValue().setState(State.START_VERTEX);
+ setVertexValue(getVertexValue());
+ } else if (msg.getMessage() == Message.END
+ && getVertexValue().getState() == State.MID_VERTEX) {
+ getVertexValue().setState(State.END_VERTEX);
+ setVertexValue(getVertexValue());
voteToHalt();
- }
- else
+ } else
voteToHalt();
}
+
/**
* send start message to next node
*/
- public void sendStartMsgToNextNode(){
+ public void sendStartMsgToNextNode() {
msg.setMessage(Message.START);
- msg.setSourceVertexId(vertexId);
+ msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
voteToHalt();
}
+
/**
* send end message to next node
*/
- public void sendEndMsgToNextNode(){
+ public void sendEndMsgToNextNode() {
msg.setMessage(Message.END);
- msg.setSourceVertexId(vertexId);
+ msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
voteToHalt();
}
+
/**
* send non message to next node
*/
- public void sendNonMsgToNextNode(){
+ public void sendNonMsgToNextNode() {
msg.setMessage(Message.NON);
- msg.setSourceVertexId(vertexId);
+ msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
}
+
/**
* head send message to path
*/
- public void sendMsgToPathVertex(VKmerBytesWritable chainVertexId, byte adjMap){
- if(GeneCode.getGeneCodeFromBitMap((byte)(vertexVal.getAdjMap() & 0x0F)) == -1) //|| lastKmer == null
+ public void sendMsgToPathVertex(KmerBytesWritable chainVertexId,
+ byte adjMap) {
+ if (GeneCode
+ .getGeneCodeFromBitMap((byte) (getVertexValue().getAdjMap() & 0x0F)) == -1) // ||// lastKmer == null
+
voteToHalt();
- else{
- destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
- if(vertexVal.getState() == State.START_VERTEX){
+ else {
+ destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId,
+ adjMap));
+ if (getVertexValue().getState() == State.START_VERTEX) {
sendStartMsgToNextNode();
- }
- else if(vertexVal.getState() != State.END_VERTEX && vertexVal.getState() != State.FINAL_DELETE){
+ } else if (getVertexValue().getState() != State.END_VERTEX
+ && getVertexValue().getState() != State.FINAL_DELETE) {
sendEndMsgToNextNode();
}
}
}
+
/**
- * path send message to head
+ * path send message to head
*/
- public void responseMsgToHeadVertex(){
- if(vertexVal.getLengthOfMergeChain() == -1){
- vertexVal.setMergeChain(vertexId);
- setVertexValue(vertexVal);
+ public void responseMsgToHeadVertex() {
+ if (getVertexValue().getLengthOfMergeChain() == -1) {
+ getVertexValue().setMergeChain(getVertexId());
+ setVertexValue(getVertexValue());
}
- msg.set(msg.getSourceVertexId(), vertexVal.getMergeChain(), vertexVal.getAdjMap(), msg.getMessage(), vertexVal.getState());
+ msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(),
+ getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
setMessageType(msg.getMessage());
destVertexId.set(msg.getSourceVertexId());
- sendMsg(destVertexId,msg);
+ sendMsg(destVertexId, msg);
}
+
/**
* set message type
*/
- public void setMessageType(int message){
- //kill Message because it has been merged by the head
- if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ public void setMessageType(int message) {
+ // kill Message because it has been merged by the head
+ if (getVertexValue().getState() == State.END_VERTEX
+ || getVertexValue().getState() == State.FINAL_DELETE) {
msg.setMessage(Message.END);
- vertexVal.setState(State.FINAL_DELETE);
- setVertexValue(vertexVal);
- //deleteVertex(getVertexId());
- }
- else
+ getVertexValue().setState(State.FINAL_DELETE);
+ setVertexValue(getVertexValue());
+ // deleteVertex(getVertexId());
+ } else
msg.setMessage(Message.NON);
-
- if(message == Message.START){
- vertexVal.setState(State.TODELETE);
- setVertexValue(vertexVal);
+
+ if (message == Message.START) {
+ getVertexValue().setState(State.TODELETE);
+ setVertexValue(getVertexValue());
}
}
+
/**
- * set vertexValue's state chainVertexId, value
+ * set vertexValue's state chainVertexId, value
*/
- public void setVertexValueAttributes(){
- if(msg.getMessage() == Message.END){
- if(vertexVal.getState() != State.START_VERTEX)
- vertexVal.setState(State.END_VERTEX);
+ public void setVertexValueAttributes() {
+ if (msg.getMessage() == Message.END) {
+ if (getVertexValue().getState() != State.START_VERTEX)
+ getVertexValue().setState(State.END_VERTEX);
else
- vertexVal.setState(State.FINAL_VERTEX);
+ getVertexValue().setState(State.FINAL_VERTEX);
}
-
- if(getSuperstep() == 5)
- chainVertexId.set(vertexId);
+
+ if (getSuperstep() == 5)
+ chainVertexId.set(getVertexId());
else
- chainVertexId.set(vertexVal.getMergeChain());
- lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
+ chainVertexId.set(getVertexValue().getMergeChain());
+ lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain()
+ - kmerSize + 1, msg.getChainVertexId()));
chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
- vertexVal.setMergeChain(chainVertexId);
-
- byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
- vertexVal.setAdjMap(tmpVertexValue);
+ getVertexValue().setMergeChain(chainVertexId);
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(
+ getVertexValue().getAdjMap(), msg.getAdjMap());
+ getVertexValue().setAdjMap(tmpVertexValue);
}
+
/**
- * send message to self
+ * send message to self
*/
- public void sendMsgToSelf(){
- if(msg.getMessage() != Message.END){
- setVertexValue(vertexVal);
- msg.reset(); //reset
- msg.setAdjMap(vertexVal.getAdjMap());
- sendMsg(vertexId,msg);
+ public void sendMsgToSelf() {
+ if (msg.getMessage() != Message.END) {
+ setVertexValue(getVertexValue());
+ msg.reset(); // reset
+ msg.setAdjMap(getVertexValue().getAdjMap());
+ sendMsg(getVertexId(), msg);
}
}
+
/**
* start sending message
*/
- public void startSendMsg(){
- if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
- msg.set(vertexId, chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
- sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+ public void startSendMsg() {
+ if (GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())) {
+ msg.set(getVertexId(), chainVertexId, (byte) 0, Message.START,
+ State.NON_VERTEX); // msg.set(null, (byte)0, chainVertexId,
+ // Message.START, State.NON_VERTEX);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
- if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
- msg.set(vertexId, chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
- sendMsgToAllPreviousNodes(vertexId, vertexVal.getAdjMap());
+ if (GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())) {
+ msg.set(getVertexId(), chainVertexId, (byte) 0, Message.END,
+ State.NON_VERTEX);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
- if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
- vertexVal.setState(State.MID_VERTEX);
- setVertexValue(vertexVal);
+ if (GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())) {
+ getVertexValue().setState(State.MID_VERTEX);
+ setVertexValue(getVertexValue());
}
}
+
/**
- * initiate head, rear and path node
+ * initiate head, rear and path node
*/
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ while (msgIterator.hasNext()) {
+ if (!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())) {
msgIterator.next();
voteToHalt();
- }
- else{
+ } else {
msg = msgIterator.next();
setState();
}
}
}
+
/**
* head send message to path
*/
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(getSuperstep() == 3){
+ public void sendMsgToPathVertex(
+ Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ if (getSuperstep() == 3) {
msg.reset();
- sendMsgToPathVertex(vertexId, vertexVal.getAdjMap());
- }
- else{
- if(msgIterator.hasNext()){
+ sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
+ } else {
+ if (msgIterator.hasNext()) {
msg = msgIterator.next();
- sendMsgToPathVertex(vertexVal.getMergeChain(), msg.getAdjMap());
+ sendMsgToPathVertex(getVertexValue().getMergeChain(), msg.getAdjMap());
}
}
}
+
/**
* path response message to head
*/
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
+ public void responseMsgToHeadVertex(
+ Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ if (msgIterator.hasNext()) {
msg = msgIterator.next();
responseMsgToHeadVertex();
- //voteToHalt();
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
- //vertexVal.setState(State.KILL_SELF);
- //setVertexValue(vertexVal);
- //voteToHalt();
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ // voteToHalt();
+ } else {
+ if (getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX
+ && getVertexValue().getState() != State.FINAL_DELETE) {
+ // vertexVal.setState(State.KILL_SELF);
+ // setVertexValue(vertexVal);
+ // voteToHalt();
+ deleteVertex(getVertexId());// killSelf because it doesn't
+ // receive any message
}
}
}
+
/**
* merge chainVertex and store in vertexVal.chainVertexId
*/
- public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
+ public void mergeChainVertex(
+ Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ if (msgIterator.hasNext()) {
msg = msgIterator.next();
setVertexValueAttributes();
sendMsgToSelf();
}
- if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ if (getVertexValue().getState() == State.END_VERTEX
+ || getVertexValue().getState() == State.FINAL_DELETE) {
voteToHalt();
}
- if(vertexVal.getState() == State.FINAL_VERTEX){
+ if (getVertexValue().getState() == State.FINAL_VERTEX) {
//String source = vertexVal.getMergeChain().toString();
voteToHalt();
}
}
+
@Override
public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
initVertex();
- if(vertexVal.getState() != State.NON_EXIST && vertexVal.getState() != State.KILL_SELF){
- if (getSuperstep() == 1)
+ if (getVertexValue().getState() != State.NON_EXIST
+ && getVertexValue().getState() != State.KILL_SELF) {
+ if (getSuperstep() == 1)
startSendMsg();
- else if(getSuperstep() == 2)
+ else if (getSuperstep() == 2)
initState(msgIterator);
- else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
+ else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
sendMsgToPathVertex(msgIterator);
- }
- else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
+ } else if (getSuperstep() % 3 == 1
+ && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- }
- else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
- if(vertexVal.getState() == State.TODELETE){ //|| vertexVal.getState() == State.KILL_SELF)
- //vertexVal.setState(State.NON_EXIST);
- //setVertexValue(vertexVal);
- //voteToHalt();
- deleteVertex(getVertexId()); //killSelf
- }
- else{
+ } else if (getSuperstep() % 3 == 2
+ && getSuperstep() <= maxIteration) {
+ if (getVertexValue().getState() == State.TODELETE) { // || vertexVal.getState() == State.KILL_SELF)
+ // vertexVal.setState(State.NON_EXIST);
+ // setVertexValue(vertexVal);
+ // voteToHalt();
+ deleteVertex(getVertexId()); // killSelf
+ } else {
mergeChainVertex(msgIterator);
}
}
}
}
+
/**
* @param args
*/
public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
+ PregelixJob job = new PregelixJob(
+ LogAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index 8a70bd5..6941865 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -62,7 +62,6 @@
private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable vertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
@@ -75,7 +74,6 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- vertexId.set(getVertexId());
vertexVal = getVertexValue();
}
public void findDestination(){
@@ -84,7 +82,7 @@
/**
* get destination vertex
*/
- public VKmerBytesWritable getDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode){
return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
}
@@ -95,7 +93,7 @@
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
if((adjMap & (1 << x)) != 0){
destVertexId.set(getDestVertexId(vertexId, x));
@@ -110,8 +108,8 @@
if(!msg.isRear()){
findDestination();
if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
- chainVertexId.set(vertexId);
- msg.set(vertexId, chainVertexId, vertexId, vertexVal.getAdjMap(), false);
+ chainVertexId.set(getVertexId());
+ msg.set(getVertexId(), chainVertexId, getVertexId(), vertexVal.getAdjMap(), false);
sendMsg(destVertexId,msg);
}else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap()))
voteToHalt();
@@ -126,7 +124,7 @@
}else{
destVertexId.set(msg.getHeadVertexId());
}
- msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+ msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
sendMsg(destVertexId,msg);
}
/**
@@ -137,15 +135,13 @@
findDestination();
if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
- vertexId.getGeneCodeAtPosition(kmerSize - 1));
+ getVertexId().getGeneCodeAtPosition(kmerSize - 1));
deleteVertex(getVertexId());
- //vertexVal.setState(State.NON_EXIST);
- //setVertexValue(vertexVal);
- msg.set(vertexId, chainVertexId, msg.getHeadVertexId(), vertexVal.getAdjMap(), false);
+ msg.set(getVertexId(), chainVertexId, msg.getHeadVertexId(), vertexVal.getAdjMap(), false);
sendMsg(destVertexId,msg);
}
else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
- msg.set(vertexId, msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
+ msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
sendMsg(destVertexId,msg);
}
}else{// is Rear
@@ -161,32 +157,30 @@
@Override
public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
initVertex();
- if(vertexVal.getState() != State.NON_EXIST){
- if (getSuperstep() == 1) {
- if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
- msg.set(vertexId, chainVertexId, vertexId, (byte)0, false);
- sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
- }
+ if (getSuperstep() == 1) {
+ if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
+ msg.set(getVertexId(), chainVertexId, getVertexId(), (byte)0, false);
+ sendMsgToAllNextNodes(getVertexId(), vertexVal.getAdjMap());
}
- else if(getSuperstep() == 2){
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- initChainVertex();
- }
+ }
+ else if(getSuperstep() == 2){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ initChainVertex();
}
- //head node sends message to path node
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- while (msgIterator.hasNext()){
- msg = msgIterator.next();
- sendMsgToPathVertex();
- }
+ }
+ //head node sends message to path node
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ while (msgIterator.hasNext()){
+ msg = msgIterator.next();
+ sendMsgToPathVertex();
}
- //path node sends message back to head node
- else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
- while(msgIterator.hasNext()){
- msg = msgIterator.next();
- responseMsgToHeadVertex();
- }
+ }
+ //path node sends message back to head node
+ else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
+ while(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ responseMsgToHeadVertex();
}
}
voteToHalt();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index d97a2fd..1f90dbd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -40,9 +40,9 @@
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Path dir = new Path("data");
- Path inFile = new Path(dir, "part-0");
- Path outFile = new Path(dir, "part-0-out-5000000");
- generateNumOfLinesFromBigFile(inFile,outFile,5000000);
+ Path inFile = new Path(dir, "part-1");
+ Path outFile = new Path(dir, "part-1-out-3000000");
+ generateNumOfLinesFromBigFile(inFile,outFile,3000000);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index c759261..8d06e8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -11,10 +11,11 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
public class GenerateTextFile {
- public static void generate() throws IOException{
+ public static void generateFromPathmergeResult() throws IOException{
BufferedWriter bw = new BufferedWriter(new FileWriter("text/log_TreePath"));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
@@ -36,13 +37,32 @@
}
bw.close();
}
+ public static void generateFromGraphbuildResult() throws IOException{
+ BufferedWriter bw = new BufferedWriter(new FileWriter("textfile"));
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+ Path path = new Path("data/input/part-0-out-3000000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
+ KmerBytesWritable key = new KmerBytesWritable(55);
+ KmerCountValue value = new KmerCountValue();
+
+ while(reader.next(key, value)){
+ if (key == null || value == null){
+ break;
+ }
+ bw.write(key.toString());
+ bw.newLine();
+ }
+ reader.close();
+ bw.close();
+ }
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
- // TODO Auto-generated method stub
- generate();
+ generateFromPathmergeResult();
+ generateFromGraphbuildResult();
}
}