Merge branch 'nanzhang/genomix' into genomix/fullstack_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
index a030f0b..86b9117 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
@@ -49,4 +49,5 @@
return strKmer.toString();
}
+
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
index 219def6..ecbf1ec 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
@@ -12,4 +12,11 @@
bytes[offset + 2] = (byte)((val >>> 8) & 0xFF);
bytes[offset + 3] = (byte)((val >>> 0) & 0xFF);
}
+
+ public static int hashBytes(byte[] bytes, int offset, int length) {
+ int hash = 1;
+ for (int i = offset; i < offset + length; i++)
+ hash = (31 * hash) + (int) bytes[i];
+ return hash;
+ }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 5be5f83..d9f5c48 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -48,98 +48,22 @@
}
return r;
}
+
+ public static byte getPairedGeneCode(byte genecode){
+ if ( genecode < 0 || genecode > 3){
+ throw new IllegalArgumentException("Invalid genecode");
+ }
+ return (byte) (3- genecode);
+ }
+
+ public static byte getPairedCodeFromSymbol(byte ch){
+ return getPairedGeneCode(getCodeFromSymbol(ch));
+ }
public static byte getSymbolFromCode(byte code) {
- if (code > 3) {
- return '!';
+ if (code > 3 || code < 0 ) {
+ throw new IllegalArgumentException("Invalid genecode");
}
return GENE_SYMBOL[code];
}
-
- public static byte getAdjBit(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 1 << A;
- break;
- case 'C':
- case 'c':
- r = 1 << C;
- break;
- case 'G':
- case 'g':
- r = 1 << G;
- break;
- case 'T':
- case 't':
- r = 1 << T;
- break;
- }
- return r;
- }
-
- /**
- * It works for path merge. Merge the kmer by his next, we need to make sure
- * the @{t} is a single neighbor.
- *
- * @param t
- * the neighbor code in BitMap
- * @return the genecode
- */
- public static byte getGeneCodeFromBitMap(byte t) {
- switch (t) {
- case 1 << A:
- return A;
- case 1 << C:
- return C;
- case 1 << G:
- return G;
- case 1 << T:
- return T;
- }
- return -1;
- }
-
- public static byte getBitMapFromGeneCode(byte t) {
- return (byte) (1 << t);
- }
-
- public static int countNumberOfBitSet(int i) {
- int c = 0;
- for (; i != 0; c++) {
- i &= i - 1;
- }
- return c;
- }
-
- public static int inDegree(byte bitmap) {
- return countNumberOfBitSet((bitmap >> 4) & 0x0f);
- }
-
- public static int outDegree(byte bitmap) {
- return countNumberOfBitSet(bitmap & 0x0f);
- }
-
- public static byte mergePreNextAdj(byte pre, byte next) {
- return (byte) (pre << 4 | (next & 0x0f));
- }
-
- public static String getSymbolFromBitMap(byte code) {
- int left = (code >> 4) & 0x0F;
- int right = code & 0x0F;
- StringBuilder str = new StringBuilder();
- for (int i = A; i <= T; i++) {
- if ((left & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- str.append('|');
- for (int i = A; i <= T; i++) {
- if ((right & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- return str.toString();
- }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 4883176..40d8a23 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -204,8 +204,9 @@
}
/**
- * Compress Reversed Kmer into bytes array AATAG will compress as
- * [0x000A,0xATAG]
+ * Compress Reversed read into bytes array
+ * e.g. AATAG will paired to CTATT, and then compress as
+ * [0x000T,0xTATC]
*
* @param input
* array
@@ -217,7 +218,7 @@
int bytecount = 0;
int bcount = size - 1;
for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
- byte code = GeneCode.getCodeFromSymbol(array[i]);
+ byte code = GeneCode.getPairedCodeFromSymbol(array[i]);
l |= (byte) (code << bytecount);
bytecount += 2;
if (bytecount == 8) {
@@ -308,7 +309,7 @@
* : next neighbor in gene-code format
* @return the merged Kmer, this K of this Kmer is k+1
*/
- public void mergeKmerWithNextCode(byte nextCode) {
+ public void mergeNextCode(byte nextCode) {
this.kmerlength += 1;
setSize(KmerUtil.getByteNumFromK(kmerlength));
if (kmerlength % 4 == 1) {
@@ -317,10 +318,61 @@
}
bytes[offset] = (byte) (nextCode & 0x3);
} else {
- bytes[offset] = (byte) (bytes[offset] | ((nextCode & 0x3) << (((kmerlength-1) % 4) << 1)));
+ bytes[offset] = (byte) (bytes[offset] | ((nextCode & 0x3) << (((kmerlength - 1) % 4) << 1)));
}
clearLeadBit();
}
+
+ public void mergePreCode(byte preCode) {
+ //TODO
+ return;
+ }
+
+ /**
+ * Merge Kmer with the next connected Kmer
+ * e.g. AAGCTAA merge with AACAACC, if the initial kmerSize = 3
+ * then it will return AAGCTAACAACC
+ *
+ * @param initialKmerSize
+ * : the initial kmerSize
+ * @param kmer
+ */
+ @SuppressWarnings("unchecked")
+ public void mergeNextKmer(int initialKmerSize, KmerBytesWritable kmer) {
+ if (kmer.getKmerLength() == initialKmerSize){
+ mergeNextCode(kmer.getGeneCodeAtPosition(kmer.getKmerLength()-1));
+ return;
+ }
+ int preKmerLength = kmerlength;
+ int preSize = size;
+ this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ int i = 1;
+ for (; i <= preSize; i++) {
+ bytes[offset + size - i] = bytes[offset + preSize - i];
+ }
+ if (i > 1) {
+ i--;
+ }
+ if (preKmerLength % 4 == 0) {
+ for (int j = 1; j <= kmer.getLength() && offset + size >= i + j; j++) {
+ bytes[offset + size - i - j] = kmer.getBytes()[kmer.getOffset() + kmer.getLength() - j];
+ }
+ } else {
+ int posNeedToMove = ((preKmerLength % 4) << 1);
+ bytes[offset + size - i] |= kmer.getBytes()[kmer.getOffset() + kmer.getLength() - 1] << posNeedToMove;
+ for (int j = 1; j <= kmer.getLength() && offset + size - i - j >= 0; j++) {
+ bytes[offset + size - i - j] = (byte) (((kmer.getBytes()[kmer.getOffset() + kmer.getLength() - j] & 0xff) >> (8 - posNeedToMove)) | (kmer
+ .getBytes()[kmer.getOffset() + kmer.getLength() - j - 1] << posNeedToMove));
+ }
+ }
+ clearLeadBit();
+ }
+
+ public void mergePreKmer(int initialKmerSize, KmerBytesWritable kmer){
+ //TODO
+ return;
+ }
protected void clearLeadBit() {
if (kmerlength % 4 != 0) {
@@ -390,4 +442,5 @@
static { // register this comparator
WritableComparator.define(KmerBytesWritable.class, new Comparator());
}
+
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 599e51f..c0b00a7 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -16,8 +16,8 @@
private PositionListWritable incomingList;
private PositionListWritable outgoingList;
private KmerBytesWritable kmer;
-
- public NodeWritable(){
+
+ public NodeWritable() {
nodeID = new PositionWritable();
incomingList = new PositionListWritable();
outgoingList = new PositionListWritable();
@@ -31,10 +31,6 @@
kmer = new KmerBytesWritable(kmerSize);
}
- public int getCount() {
- return kmer.getKmerLength();
- }
-
public void setNodeID(PositionWritable ref) {
this.setNodeID(ref.getReadID(), ref.getPosInRead());
}
@@ -47,6 +43,7 @@
incomingList.set(incoming);
}
+
public void setOutgoingList(PositionListWritable outgoing) {
outgoingList.set(outgoing);
}
@@ -54,6 +51,7 @@
public void setKmer(KmerBytesWritable right) {
this.kmer.set(right);
}
+
public void reset(int kmerSize) {
nodeID.set(0, (byte) 0);
incomingList.reset();
@@ -77,9 +75,18 @@
return kmer;
}
- public void mergeNextWithinOneRead(NodeWritable nextNodeEntry) {
- this.outgoingList.set(nextNodeEntry.outgoingList);
- kmer.mergeKmerWithNextCode(nextNodeEntry.kmer.getGeneCodeAtPosition(nextNodeEntry.getCount() - 1));
+ public int getCount() {
+ return kmer.getKmerLength();
+ }
+
+ public void mergeNext(NodeWritable nextNode, int initialKmerSize) {
+ this.outgoingList.set(nextNode.outgoingList);
+ kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
+ }
+
+ public void mergePre(NodeWritable preNode, int initialKmerSize){
+ this.incomingList.set(preNode.incomingList);
+ kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
}
public void set(NodeWritable node) {
@@ -114,9 +121,9 @@
public int hashCode() {
return nodeID.hashCode();
}
-
+
@Override
- public String toString(){
+ public String toString() {
StringBuilder sbuilder = new StringBuilder();
sbuilder.append('(');
sbuilder.append(nodeID.toString()).append('\t');
@@ -126,10 +133,11 @@
return sbuilder.toString();
}
- public boolean existsInSinglePath(){
- if(this.incomingList.getCountOfPosition() == 1 & this.outgoingList.getCountOfPosition() == 1)
- return true;
- else
- return false;
+ /*
+ * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1
+ */
+ public boolean isPathNode() {
+ return incomingList.getCountOfPosition() == 1 && outgoingList.getCountOfPosition() == 1;
}
+
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
new file mode 100644
index 0000000..55c6ca7
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NoteWritableFactory.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.genomix.type;
+
+public class NoteWritableFactory {
+ private NodeWritable node;
+ private KmerBytesWritableFactory kmerBytesWritableFactory;
+ private int kmerSize = 55;
+
+ public NoteWritableFactory() {
+ node = new NodeWritable();
+ kmerBytesWritableFactory = new KmerBytesWritableFactory(kmerSize);
+ }
+
+ public NodeWritable append(final NodeWritable orignalNode, final KmerBytesWritable appendKmer){
+ KmerBytesWritable preKmer = orignalNode.getKmer();
+ node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(preKmer,appendKmer));
+ return node;
+ }
+
+ public NodeWritable append(final NodeWritable orignalNode, final NodeWritable appendNode) {
+ KmerBytesWritable nextKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, appendNode.getKmer().kmerlength - kmerSize + 2,
+ appendNode.getKmer());
+ return append(orignalNode, nextKmer);
+ }
+
+ public NodeWritable prepend(final NodeWritable orignalNode, final KmerBytesWritable prependKmer){
+ KmerBytesWritable nextKmer = orignalNode.getKmer();
+ node.setKmer(kmerBytesWritableFactory.mergeTwoKmer(prependKmer,nextKmer));
+ return node;
+ }
+
+ public NodeWritable prepend(final NodeWritable orignalNode, final NodeWritable prependNode) {
+ KmerBytesWritable prependKmer = kmerBytesWritableFactory.getSubKmerFromChain(kmerSize - 2, orignalNode.getKmer().kmerlength - kmerSize + 2,
+ orignalNode.getKmer());
+ return prepend(orignalNode, prependKmer);
+ }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 787afec..f77c844 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -10,7 +10,7 @@
import edu.uci.ics.genomix.data.Marshal;
-public class PositionWritable implements WritableComparable<PositionWritable> , Serializable{
+public class PositionWritable implements WritableComparable<PositionWritable>, Serializable {
/**
*
*/
@@ -33,20 +33,21 @@
public PositionWritable(byte[] storage, int offset) {
setNewReference(storage, offset);
}
-
+
public void setNewReference(byte[] storage, int offset) {
this.storage = storage;
this.offset = offset;
}
+
+ public void set(PositionWritable pos) {
+ set(pos.getReadID(), pos.getPosInRead());
+ }
public void set(int readID, byte posInRead) {
Marshal.putInt(readID, storage, offset);
storage[offset + INTBYTES] = posInRead;
}
- public void set(PositionWritable right) {
- this.set(right.getReadID(), right.getPosInRead());
- }
public int getReadID() {
return Marshal.getInt(storage, offset);
}
@@ -66,7 +67,11 @@
public int getLength() {
return LENGTH;
}
-
+
+ public boolean isSameReadID(PositionWritable other){
+ return getReadID() == other.getReadID();
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
in.readFully(storage, offset, LENGTH);
@@ -76,10 +81,10 @@
public void write(DataOutput out) throws IOException {
out.write(storage, offset, LENGTH);
}
-
+
@Override
public int hashCode() {
- return this.getReadID();
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
}
@Override
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index b7dd8f1..04fc3bb 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -59,7 +59,7 @@
Assert.assertEquals(kmer.toString(), "AATAGAA");
kmer.setByReadReverse(array, 1);
- Assert.assertEquals(kmer.toString(), "GAAGATA");
+ Assert.assertEquals(kmer.toString(), "CTTCTAT");
}
@Test
@@ -100,11 +100,35 @@
String text = "AGCTGACCG";
for (int i = 0; i < 10; i++) {
for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- kmer.mergeKmerWithNextCode(x);
+ kmer.mergeNextCode(x);
text = text + (char) GeneCode.GENE_SYMBOL[x];
Assert.assertEquals(text, kmer.toString());
}
}
}
+
+ @Test
+ public void TestMergeNextKmer(){
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
+ KmerBytesWritable kmer1 = new KmerBytesWritable(9);
+ kmer1.setByRead(array, 0);
+ String text1 = "AGCTGACCG";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(9);
+ kmer2.setByRead(array, 1);
+ String text2 = "GCTGACCGT";
+ Assert.assertEquals(text1, kmer1.toString());
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merged = new KmerBytesWritable(kmer1);
+ merged.mergeNextKmer(9, kmer2);
+ Assert.assertEquals("AGCTGACCGT", merged);
+
+ KmerBytesWritable kmer3 = new KmerBytesWritable(3);
+ kmer3.setByRead(array, 1);
+ String text3 = "GCT";
+ Assert.assertEquals(text3, kmer3.toString());
+ merged.mergeNextKmer(1, kmer3);
+ Assert.assertEquals(text1 + text3, merged.toString());
+ }
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
new file mode 100644
index 0000000..ddb2a64
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -0,0 +1,246 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH3 extends Configured implements Tool {
+ /*
+ * Flags used when sending messages
+ */
+ public static class MessageFlag {
+ public static final byte EMPTY_MESSAGE = 0;
+ public static final byte FROM_SELF = 1;
+ public static final byte FROM_SUCCESSOR = 1 << 1;
+ public static final byte IS_HEAD = 1 << 2;
+ public static final byte FROM_PREDECESSOR = 1 << 3;
+
+ public static String getFlagAsString(byte code) {
+ // TODO: allow multiple flags to be set
+ switch (code) {
+ case EMPTY_MESSAGE:
+ return "EMPTY_MESSAGE";
+ case FROM_SELF:
+ return "FROM_SELF";
+ case FROM_SUCCESSOR:
+ return "FROM_SUCCESSOR";
+ }
+ return "ERROR_BAD_MESSAGE";
+ }
+ }
+
+ /*
+ * Common functionality for the two mapper types needed. See javadoc for MergePathsH3MapperSubsequent.
+ */
+ private static class MergePathsH3MapperBase extends MapReduceBase {
+
+ protected static long randSeed;
+ protected Random randGenerator;
+ protected float probBeingRandomHead;
+
+ protected int KMER_SIZE;
+ protected PositionWritable outputKey;
+ protected MessageWritableNodeWithFlag outputValue;
+ protected NodeWritable curNode;
+
+ public void configure(JobConf conf) {
+ randSeed = conf.getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputKey = new PositionWritable();
+ curNode = new NodeWritable(KMER_SIZE);
+ }
+
+ protected boolean isNodeRandomHead(PositionWritable nodeID) {
+ // "deterministically random", based on node id
+ randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ return randGenerator.nextFloat() < probBeingRandomHead;
+ }
+ }
+
+ /*
+ * Mapper class: Partition the graph using random pseudoheads.
+ * Heads send themselves to their successors, and all others map themselves.
+ */
+ private static class MergePathsH3MapperSubsequent extends MergePathsH3MapperBase implements
+ Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ @Override
+ public void map(PositionWritable key, MessageWritableNodeWithFlag value,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+ curNode = value.getNode();
+ // Map all path vertices; tail nodes are sent to their predecessors
+ if (curNode.isPathNode()) {
+ boolean isHead = (value.getFlag() & MessageFlag.IS_HEAD) == MessageFlag.IS_HEAD;
+ if (isHead || isNodeRandomHead(curNode.getNodeID())) {
+ // head nodes send themselves to their successor
+ outputKey.set(curNode.getOutgoingList().getPosition(0));
+ outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
+ output.collect(outputKey, outputValue);
+ } else {
+ // tail nodes map themselves
+ outputValue.set(MessageFlag.FROM_SELF, curNode);
+ output.collect(key, outputValue);
+ }
+ }
+ }
+ }
+
+ /*
+ * Mapper used for the first iteration. See javadoc for MergePathsH3MapperSubsequent.
+ */
+ private static class MergePathsH3MapperInitial extends MergePathsH3MapperBase implements
+ Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
+ @Override
+ public void map(NodeWritable key, NullWritable value,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+ curNode = key;
+ // Map all path vertices; tail nodes are sent to their predecessors
+ if (curNode.isPathNode()) {
+ if (isNodeRandomHead(curNode.getNodeID())) {
+ // head nodes send themselves to their successor
+ outputKey.set(curNode.getOutgoingList().getPosition(0));
+ outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
+ output.collect(outputKey, outputValue);
+ } else {
+ // tail nodes map themselves
+ outputValue.set(MessageFlag.FROM_SELF, curNode);
+ output.collect(key.getNodeID(), outputValue);
+ }
+ }
+ }
+ }
+
+ /*
+ * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
+ */
+ private static class MergePathsH3Reducer extends MapReduceBase implements
+ Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+
+ private int KMER_SIZE;
+ private MessageWritableNodeWithFlag inputValue;
+ private MessageWritableNodeWithFlag outputValue;
+ private NodeWritable headNode;
+ private NodeWritable tailNode;
+ private int count;
+
+ public void configure(JobConf conf) {
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ headNode = new NodeWritable(KMER_SIZE);
+ tailNode = new NodeWritable(KMER_SIZE);
+ }
+
+ @Override
+ public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+
+ inputValue = values.next();
+ if (!values.hasNext()) {
+ // all single nodes must be remapped
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ // FROM_SELF => remap self
+ output.collect(key, inputValue);
+ } else {
+ // FROM_PREDECESSOR => remap predecessor
+ output.collect(inputValue.getNode().getNodeID(), inputValue);
+ }
+ } else {
+ // multiple inputs => a merge will take place. Aggregate both, then collect the merged path
+ count = 0;
+ while (true) { // process values; break when no more
+ count++;
+ if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+ headNode.set(inputValue.getNode());
+ } else {
+ tailNode.set(inputValue.getNode());
+ }
+ if (!values.hasNext()) {
+ break;
+ } else {
+ inputValue = values.next();
+ }
+ }
+ if (count != 2) {
+ throw new IOException("Expected two nodes in MergePathsH3 reduce; saw " + String.valueOf(count));
+ }
+ // merge the head and tail as saved output, this merged node is now a head
+ headNode.mergeNext(tailNode, KMER_SIZE);
+ outputValue.set(MessageFlag.IS_HEAD, headNode);
+ output.collect(key, outputValue);
+ }
+ }
+ }
+
+ /*
+ * Run one iteration of the mergePaths algorithm
+ */
+ public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ JobConf conf = new JobConf(baseConf);
+ conf.setJarByClass(MergePathsH3.class);
+ conf.setJobName("MergePathsH3 " + inputPath);
+
+ FileInputFormat.addInputPath(conf, new Path(inputPath));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ conf.setMapOutputKeyClass(PositionWritable.class);
+ conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputKeyClass(PositionWritable.class);
+ conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+
+ // on the first iteration, we have to transform from a node-oriented graph
+ // to a Position-oriented graph
+ if (conf.getInt("iMerge", 1) == 1) {
+ conf.setMapperClass(MergePathsH3MapperInitial.class);
+ } else {
+ conf.setMapperClass(MergePathsH3MapperSubsequent.class);
+ }
+ conf.setReducerClass(MergePathsH3Reducer.class);
+
+ FileSystem.get(conf).delete(new Path(outputPath), true);
+
+ return JobClient.runJob(conf);
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new MergePathsH3(), args);
+ System.exit(res);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
new file mode 100644
index 0000000..4982439
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH3Driver {
+
+ private static class Options {
+ @Option(name = "-inputpath", usage = "the input path", required = true)
+ public String inputPath;
+
+ @Option(name = "-outputpath", usage = "the output path", required = true)
+ public String outputPath;
+
+ @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
+ public String mergeResultPath;
+
+ @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+ public int numReducers;
+
+ @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+ public int sizeKmer;
+
+ @Option(name = "-merge-rounds", usage = "the while rounds of merging", required = true)
+ public int mergeRound;
+
+ }
+
+ public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound)
+ throws IOException {
+ JobConf baseConf = new JobConf(); // I don't know the semantics here. do i use a base conf file or something?
+ baseConf.setNumReduceTasks(numReducers);
+ baseConf.setInt("sizeKmer", sizeKmer);
+
+ FileSystem dfs = FileSystem.get(baseConf);
+ String prevOutput = inputPath;
+ dfs.delete(new Path(outputPath), true); // clear any previous output
+
+ String tmpOutputPath = "NO_JOBS_DONE";
+ for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
+ baseConf.setInt("iMerge", iMerge);
+ MergePathsH3 merger = new MergePathsH3();
+ tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
+ merger.run(prevOutput, tmpOutputPath, baseConf);
+ }
+ dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ MergePathsH3Driver driver = new MergePathsH3Driver();
+ driver.run(options.inputPath, options.outputPath, options.numReducers,
+ options.sizeKmer, options.mergeRound);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
new file mode 100644
index 0000000..5a3076c
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.NodeWritable;
+
+public class MessageWritableNodeWithFlag extends BinaryComparable implements WritableComparable<BinaryComparable> {
+ private byte flag;
+ private NodeWritable node;
+
+ public MessageWritableNodeWithFlag(int k) {
+ this.flag = 0;
+ this.node = new NodeWritable(k);
+ }
+
+ public MessageWritableNodeWithFlag(byte flag, int kmerSize) {
+ this.flag = flag;
+ this.node = new NodeWritable(kmerSize);
+ }
+
+ public void set(MessageWritableNodeWithFlag right) {
+ set(right.getFlag(), right.getNode());
+ }
+
+ public void set(byte flag, NodeWritable node) {
+ this.node.set(node);
+ this.flag = flag;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ node.readFields(arg0);
+ flag = arg0.readByte();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ node.write(arg0);
+ arg0.writeByte(flag);
+ }
+
+ public NodeWritable getNode() {
+ if (node.getCount() != 0) {
+ return node;
+ }
+ return null;
+ }
+
+ public byte getFlag() {
+ return this.flag;
+ }
+
+ public String toString() {
+ return node.toString() + '\t' + String.valueOf(flag);
+ }
+
+ @Override
+ public byte[] getBytes() {
+ if (node.getCount() != 0) {
+ return node.getKmer().getBytes();
+ } else
+ return null;
+ }
+
+ @Override
+ public int getLength() {
+ return node.getCount();
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
index 072a5c0..1da7d83 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -3,10 +3,16 @@
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+
+import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
+import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -23,12 +29,16 @@
private PositionListWritable incomingList = new PositionListWritable();
private PositionListWritable outgoingList = new PositionListWritable();
private NullWritable nullWritable = NullWritable.get();
-
+ private int KMER_SIZE;
+
+ public void configure(JobConf job) {
+ KMER_SIZE = job.getInt("sizeKmer", 0);
+ }
public enum nodeToMergeState {
SRC_UPDATE_FROM_VALUES,
SRC_NODE_NON_UPDATE,
SRC_ASSIGNED_BY_RIGHTNODE;
- };
+ }
@Override
public void reduce(PositionWritable key, Iterator<PositionListAndKmerWritable> values,
@@ -92,12 +102,12 @@
posInRead++;
break;
}
- if(nodeToMerge.existsInSinglePath() == true && nodeToBeMerged.existsInSinglePath() == true){
- nodeToMerge.mergeNextWithinOneRead(nodeToBeMerged);
+ if(nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true){
+ nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
srcState = nodeToMergeState.SRC_NODE_NON_UPDATE;
}
else{
- if(nodeToMerge.existsInSinglePath() == false && nodeToBeMerged.existsInSinglePath() == true){
+ if(nodeToMerge.isPathNode() == false && nodeToBeMerged.isPathNode() == true){
srcState = nodeToMergeState.SRC_ASSIGNED_BY_RIGHTNODE;
output.collect(nodeToBeMerged, nullWritable);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
index 3c207f4..d5924c8 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -44,6 +44,5 @@
outputVertexID.set((int)key.get(), (byte)(i - KMER_SIZE + 1));
output.collect(outputKmer, outputVertexID);
}
- /** last kmer */
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 98b6ca1..49dd263 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -5,7 +5,11 @@
public class PositionListReference extends PositionListWritable implements IValueReference {
- /**
+ public PositionListReference(int countByDataLength, byte[] byteArray, int startOffset) {
+ super(countByDataLength, byteArray, startOffset);
+ }
+
+ /**
*
*/
private static final long serialVersionUID = 1L;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 17ebde5..53bd79d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -1,8 +1,12 @@
package edu.uci.ics.genomix.hyracks.dataflow;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -99,7 +103,6 @@
for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
positionEntry.setNewReference(data, offsetPoslist + i);
if (positionEntry.getPosInRead() == 0) {
- //TODO remove the pos of the same readID
zeroPositionCollection2.append(positionEntry);
} else {
noneZeroPositionCollection2.append(positionEntry);
@@ -131,7 +134,8 @@
if (posList2 == null) {
builder2.addFieldEndOffset();
} else {
- builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+ writePosToFieldAndSkipSameReadID(pos, builder2.getDataOutput(), posList2);
+ builder2.addFieldEndOffset();
}
// set kmer, may not useful
byte[] data = accessor.getBuffer().array();
@@ -153,6 +157,22 @@
}
}
+ private void writePosToFieldAndSkipSameReadID(PositionReference pos, DataOutput ds,
+ ArrayBackedValueStorage posList2) throws HyracksDataException {
+
+ PositionListWritable plist = new PositionListWritable(PositionListWritable.getCountByDataLength(posList2
+ .getLength()), posList2.getByteArray(), posList2.getStartOffset());
+ for (PositionWritable p : plist) {
+ if (!pos.isSameReadID(p)) {
+ try {
+ ds.write(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+
@Override
public void fail() throws HyracksDataException {
writer.fail();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index d27c73d..d80ed13 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -6,6 +6,7 @@
import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -63,6 +64,7 @@
private NodeReference curNodeEntry;
private NodeReference nextNodeEntry;
+ private NodeReference nextNextNodeEntry;
public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
RecordDescriptor outputRecDesc) {
@@ -71,6 +73,7 @@
this.outputRecDesc = outputRecDesc;
curNodeEntry = new NodeReference(kmerSize);
nextNodeEntry = new NodeReference(kmerSize);
+ nextNextNodeEntry = new NodeReference(0);
}
@Override
@@ -98,25 +101,58 @@
int readID = accessor.getBuffer().getInt(
offsetPoslist + accessor.getFieldStartOffset(tIndex, InputReadIDField));
resetNode(curNodeEntry, readID, (byte) 0,
- offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), false);
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), true);
for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
resetNode(nextNodeEntry, readID, (byte) (i - InputInfoFieldStart),
offsetPoslist + accessor.getFieldStartOffset(tIndex, i), true);
+ NodeReference pNextNext = null;
+ if (i + 1 < accessor.getFieldCount()) {
+ resetNode(nextNextNodeEntry, readID, (byte) (i - InputInfoFieldStart + 1),
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, i + 1), false);
+ pNextNext = nextNextNodeEntry;
+ }
+
if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
- curNodeEntry.mergeNextWithinOneRead(nextNodeEntry);
- } else {
- curNodeEntry.setOutgoingList(nextNodeEntry.getOutgoingList());
+ if (pNextNext == null || pNextNext.getOutgoingList().getCountOfPosition() == 0) {
+ curNodeEntry.mergeNext(nextNodeEntry, kmerSize);
+ } else {
+ curNodeEntry.getOutgoingList().reset();
+ curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
+ outputNode(curNodeEntry);
+
+ nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+ curNodeEntry.set(nextNodeEntry);
+ }
+ } else { // nextNode entry outgoing > 0
+ curNodeEntry.getOutgoingList().set(nextNodeEntry.getOutgoingList());
curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
- outputNode(curNodeEntry);
nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+ outputNode(curNodeEntry);
curNodeEntry.set(nextNodeEntry);
+ curNodeEntry.getOutgoingList().reset();
}
}
outputNode(curNodeEntry);
}
- private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean byRef) {
+ /**
+ * The neighbor list is store in the next next node
+ * (3,2) [(1,0),(2,0)] will become the outgoing list of node (3,1)
+ * (3,1) [(1,0),(2,0)]
+ *
+ * @param curNodeEntry2
+ * @param nextNodeEntry2
+ */
+ // private void setOutgoingByNext(NodeReference curNodeEntry2, NodeReference nextNodeEntry2) {
+ // if (nextNodeEntry2 == null) {
+ // curNodeEntry2.getOutgoingList().reset();
+ // } else {
+ // curNodeEntry2.setOutgoingList(nextNodeEntry2.getOutgoingList());
+ // }
+ // }
+
+ private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean isInitial) {
node.reset(kmerSize);
node.setNodeID(readID, posInRead);
@@ -125,32 +161,39 @@
int countPosition = PositionListWritable.getCountByDataLength(lengthPos);
offset += INT_LENGTH;
if (posInRead == 0) {
- setPositionList(node.getIncomingList(), countPosition, buffer.array(), offset, byRef);
+ setPositionList(node.getIncomingList(), countPosition, buffer.array(), offset, true);
+ // minus 1 position of the incoming list to get the correct predecessor
+ for (PositionWritable pos : node.getIncomingList()) {
+ if (pos.getPosInRead() == 0) {
+ throw new IllegalArgumentException("The incoming position list contain invalid posInRead");
+ }
+ pos.set(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+ }
} else {
- setPositionList(node.getOutgoingList(), countPosition, buffer.array(), offset, byRef);
+ setPositionList(node.getOutgoingList(), countPosition, buffer.array(), offset, isInitial);
}
offset += lengthPos;
int lengthKmer = buffer.getInt(offset);
if (node.getKmer().getLength() != lengthKmer) {
throw new IllegalStateException("Size of Kmer is invalid ");
}
- setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, byRef);
+ setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, isInitial);
}
- private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean byRef) {
- if (byRef) {
- kmer.setNewReference(array, offset);
- } else {
+ private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean isInitial) {
+ if (isInitial) {
kmer.set(array, offset);
+ } else {
+ kmer.setNewReference(array, offset);
}
}
private void setPositionList(PositionListWritable positionListWritable, int count, byte[] array, int offset,
- boolean byRef) {
- if (byRef) {
- positionListWritable.setNewReference(count, array, offset);
- } else {
+ boolean isInitial) {
+ if (isInitial) {
positionListWritable.set(count, array, offset);
+ } else {
+ positionListWritable.setNewReference(count, array, offset);
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 37d5289..855be64 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.io.Text;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -115,7 +116,7 @@
InsertToFrame(kmer, -readID, array.length - k, writer);
/** middle kmer */
for (int i = k; i < array.length; i++) {
- kmer.shiftKmerWithPreChar(array[i]);
+ kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
InsertToFrame(kmer, -readID, array.length - i - 1, writer);
}
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
deleted file mode 100644
index 216f631..0000000
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.test;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.hyracks.driver.Driver;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-
-public class JobKmerGroupbyTest {
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-
- private static final String DATA_PATH = "src/test/resources/data/webmap/text.txt";
- private static final String HDFS_INPUT_PATH = "/webmap";
- private static final String HDFS_OUTPUT_PATH = "/webmap_result";
-
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
- private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
- private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
- private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
-
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private Driver driver;
-
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
-
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-
- conf.setInt(GenomixJobConf.KMER_LENGTH, 5);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- //dfs.mkdirs(result);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
-
- @Test
- public void TestAll() throws Exception {
- cleanUpReEntry();
- TestExternalGroupby();
- cleanUpReEntry();
- TestPreClusterGroupby();
- cleanUpReEntry();
- TestHybridGroupby();
- cleanUpReEntry();
- conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
- TestExternalReversedGroupby();
- cleanUpReEntry();
- TestPreClusterReversedGroupby();
- cleanUpReEntry();
- TestHybridReversedGroupby();
- }
-
- public void TestExternalGroupby() throws Exception {
- conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
- System.err.println("Testing ExternalGroupBy");
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
- System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- public void TestHybridGroupby() throws Exception {
- conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
- System.err.println("Testing HybridGroupBy");
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- public void TestExternalReversedGroupby() throws Exception {
- conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
- conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
- System.err.println("Testing ExternalGroupBy + Reversed");
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
- }
-
- public void TestPreClusterReversedGroupby() throws Exception {
- conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
- conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
- System.err.println("Testing PreclusterGroupBy + Reversed");
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
- }
-
- public void TestHybridReversedGroupby() throws Exception {
- conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
- conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
- System.err.println("Testing HybridGroupBy + Reversed");
- driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
- }
-
- private boolean checkResults(String expectedPath) throws Exception {
- File dumped = null;
- String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
- if ("text".equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- // + partname), FileSystem.getLocal(new Configuration()),
- // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-
- // KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH,
- GenomixJobConf.DEFAULT_KMERLEN));
-// KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- KmerBytesWritable value = null;
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
-
- TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
-
- @After
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index ca9ada0..8451567 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -93,7 +93,7 @@
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
- Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, null));
+ Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, new int [] {2}));
}
public void TestEndToEnd() throws Exception {
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
index 01d4a27..2988303 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_generateNode
@@ -1,12 +1,16 @@
-((1,0) [] [(6,0),(1,3)] AATAGAA)
-((1,3) [(1,0)] [(6,0)] AGAAG)
-((2,0) [] [(6,0),(2,3)] AATAGAA)
-((2,3) [(2,0)] [(6,0)] AGAAG)
-((3,0) [] [(6,0),(3,3)] AATAGAA)
-((3,3) [(3,0)] [(6,0)] AGAAG)
-((4,0) [] [(6,0),(4,3)] AATAGAA)
-((4,3) [(4,0)] [(6,0)] AGAAG)
-((5,0) [] [(6,0),(5,3)] AATAGAA)
-((5,3) [(5,0)] [(6,0)] AGAAG)
-((6,0) [(1,3),(2,3),(3,3),(5,3),(6,3),(4,3)] [(6,0),(6,3)] AGAAGAA)
-((6,3) [(6,0)] [(6,0)] AGAAG)
+((1,0) [] [(1,2)] AATAGA)
+((1,2) [(1,0)] [(6,0),(1,3)] TAGAA)
+((1,3) [(1,2)] [] AGAAG)
+((2,0) [] [(2,2)] AATAGA)
+((2,2) [(2,0)] [(6,0),(2,3)] TAGAA)
+((2,3) [(2,2)] [] AGAAG)
+((3,0) [] [(3,2)] AATAGA)
+((3,2) [(3,0)] [(6,0),(3,3)] TAGAA)
+((3,3) [(3,2)] [] AGAAG)
+((4,0) [] [(4,2)] AATAGA)
+((4,2) [(4,0)] [(6,0),(4,3)] TAGAA)
+((4,3) [(4,2)] [] AGAAG)
+((5,0) [] [(5,2)] AATAGA)
+((5,2) [(5,0)] [(6,0),(5,3)] TAGAA)
+((5,3) [(5,2)] [] AGAAG)
+((6,0) [(1,2),(2,2),(3,2),(5,2),(4,2)] [] AGAAGAAG)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
index 0ca9de6..2585102 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
@@ -18,7 +18,7 @@
5 1 [] ATAGA
5 2 [] TAGAA
5 3 [(6,0)] AGAAG
-6 0 [(1,3),(2,3),(3,3),(4,3),(5,3),(6,3)] AGAAG
+6 0 [(1,3),(2,3),(3,3),(5,3),(4,3)] AGAAG
6 1 [] GAAGA
6 2 [] AAGAA
-6 3 [(6,0)] AGAAG
\ No newline at end of file
+6 3 [] AGAAG
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
index 82d4692..1cd4274 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
@@ -3,4 +3,4 @@
3 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
4 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
5 0 [] AATAG 1 [] ATAGA 2 [] TAGAA 3 [(6,0)] AGAAG
-6 0 [(1,3),(2,3),(3,3),(5,3),(6,3),(4,3)] AGAAG 1 [] GAAGA 2 [] AAGAA 3 [(6,0)] AGAAG
+6 0 [(1,3),(2,3),(3,3),(5,3),(4,3)] AGAAG 1 [] GAAGA 2 [] AAGAA 3 [] AGAAG
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
index a4134af..636cf86 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexInputFormat.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -13,8 +14,7 @@
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.NodeWritable;
public class BinaryVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M> {
@@ -38,7 +38,7 @@
public static abstract class BinaryVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<KmerBytesWritable, KmerCountValue> lineRecordReader;
+ private final RecordReader<NodeWritable, NullWritable> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -48,7 +48,7 @@
* @param recordReader
* Line record reader from SequenceFileInputFormat
*/
- public BinaryVertexReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
+ public BinaryVertexReader(RecordReader<NodeWritable, NullWritable> recordReader) {
this.lineRecordReader = recordReader;
}
@@ -74,7 +74,7 @@
*
* @return Record reader to be used for reading.
*/
- protected RecordReader<KmerBytesWritable, KmerCountValue> getRecordReader() {
+ protected RecordReader<NodeWritable, NullWritable> getRecordReader() {
return lineRecordReader;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
index d921b5e..8fbd1ce 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryVertexOutputFormat.java
@@ -11,7 +11,7 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -49,7 +49,7 @@
/** Context passed to initialize */
private TaskAttemptContext context;
/** Internal line record writer */
- private final RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter;
+ private final RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter;
/**
* Initialize with the LineRecordWriter.
@@ -57,7 +57,7 @@
* @param lineRecordWriter
* Line record writer from SequenceFileOutputFormat
*/
- public BinaryVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
this.lineRecordWriter = lineRecordWriter;
}
@@ -76,7 +76,7 @@
*
* @return Record writer to be used for writing.
*/
- public RecordWriter<KmerBytesWritable, ValueStateWritable> getRecordWriter() {
+ public RecordWriter<PositionWritable, ValueStateWritable> getRecordWriter() {
return lineRecordWriter;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 4014377..94b0c51 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -17,7 +17,7 @@
import edu.uci.ics.pregelix.core.driver.Driver;
public class Client {
-
+ //test rebase
private static class Options {
@Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
public String inputPaths;
@@ -76,7 +76,7 @@
}
if (options.pseudoRate > 0 && options.pseudoRate <= 1)
- job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
+ job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
if (options.maxRound > 0)
job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, options.maxRound);
return options;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 4a76ff6..3fe04ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -8,35 +8,36 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
public class LogAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ BinaryVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
+ public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ BinaryVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
private Vertex vertex = null;
- private KmerBytesWritable vertexId = null;
+ private NodeWritable node = new NodeWritable();
+ private PositionWritable vertexId = new PositionWritable();
private ValueStateWritable vertexValue = new ValueStateWritable();
- public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
super(recordReader);
}
@@ -47,7 +48,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex()
+ public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -59,15 +60,15 @@
/**
* set the src vertex id
*/
- if (vertexId == null)
- vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
- vertexId.set(getRecordReader().getCurrentKey());
+ node = getRecordReader().getCurrentKey();
+ vertexId.set(node.getNodeID());
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
+ vertexValue.setIncomingList(node.getIncomingList());
+ vertexValue.setOutgoingList(node.getOutgoingList());
+ vertexValue.setMergeChain(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 110247e..8a04292 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -11,16 +11,16 @@
import edu.uci.ics.pregelix.api.io.VertexWriter;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
public class LogAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
@Override
- public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
- RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<PositionWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -28,20 +28,18 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- //&& vertex.getVertexValue().getState() != State.MID_VERTEX
if (vertex.getVertexValue().getState() != State.END_VERTEX) {
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
-
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index 8abfcd0..4011838 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -7,24 +7,25 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat;
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexInputFormat.BinaryVertexReader;
public class NaiveAlgorithmForPathMergeInputFormat extends
- BinaryVertexInputFormat<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ BinaryVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
/**
* Format INPUT
*/
@SuppressWarnings("unchecked")
@Override
- public VertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> createVertexReader(
+ public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
}
@@ -32,12 +33,13 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
- BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ BinaryVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
private Vertex vertex;
- private KmerBytesWritable vertexId = null;
+ private NodeWritable node = new NodeWritable();
+ private PositionWritable vertexId = new PositionWritable();
private ValueStateWritable vertexValue = new ValueStateWritable();
- public BinaryLoadGraphReader(RecordReader<KmerBytesWritable, KmerCountValue> recordReader) {
+ public BinaryLoadGraphReader(RecordReader<NodeWritable, NullWritable> recordReader) {
super(recordReader);
}
@@ -48,7 +50,7 @@
@SuppressWarnings("unchecked")
@Override
- public Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> getCurrentVertex()
+ public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
throws IOException, InterruptedException {
if (vertex == null)
vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
@@ -61,15 +63,16 @@
/**
* set the src vertex id
*/
- if (vertexId == null)
- vertexId = new KmerBytesWritable(getRecordReader().getCurrentKey().getKmerLength());
- vertexId.set(getRecordReader().getCurrentKey());
+ node = getRecordReader().getCurrentKey();
+ vertexId.set(node.getNodeID());
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
- vertexValue.setAdjMap(kmerCountValue.getAdjBitMap());
+ vertexValue.setIncomingList(node.getIncomingList());
+ vertexValue.setOutgoingList(node.getOutgoingList());
+ vertexValue.setMergeChain(node.getKmer());
+ vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index 311283d..fe3b12d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -8,18 +8,18 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
public class NaiveAlgorithmForPathMergeOutputFormat extends
- BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+ BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
@Override
- public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(
+ public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
- RecordWriter<KmerBytesWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ RecordWriter<PositionWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
return new BinaryLoadGraphVertexWriter(recordWriter);
}
@@ -27,16 +27,14 @@
* Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
*/
public static class BinaryLoadGraphVertexWriter extends
- BinaryVertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> {
- public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
+ BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
- public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex)
+ public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- //if(vertex.getVertexValue().getState() == State.FILTER
- // || vertex.getVertexValue().getState() == State.FINAL_VERTEX)
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
deleted file mode 100644
index fc57b74..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.type.CheckMessage;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-
-public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable> {
- /**
- * sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA
- * file stores the point to the file that stores the chains of connected DNA
- */
- private KmerBytesWritable sourceVertexId;
- private VKmerBytesWritable chainVertexId;
- private byte adjMap;
- private byte message;
-
- private byte checkMessage;
-
- public LogAlgorithmMessageWritable() {
- sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
- adjMap = 0;
- message = 0;
- checkMessage = 0;
- }
-
- public void set(KmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, byte message) {
- checkMessage = 0;
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- this.message = message;
- }
-
- public void reset() {
- checkMessage = 0;
- chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
- adjMap = (byte) 0;
- message = 0;
- }
-
- public KmerBytesWritable getSourceVertexId() {
- return sourceVertexId;
- }
-
- public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- }
-
- public byte getAdjMap() {
- return adjMap;
- }
-
- public void setAdjMap(byte adjMap) {
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- }
-
- public VKmerBytesWritable getChainVertexId() {
- return chainVertexId;
- }
-
- public void setChainVertexId(VKmerBytesWritable chainVertexId) {
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- }
-
- public byte getMessage() {
- return message;
- }
-
- public void setMessage(byte message) {
- this.message = message;
- }
-
- public int getLengthOfChain() {
- return chainVertexId.getKmerLength();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(checkMessage);
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.write(out);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- out.write(adjMap);
- out.writeByte(message);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.reset();
- checkMessage = in.readByte();
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.readFields(in);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- adjMap = in.readByte();
- message = in.readByte();
- }
-
- @Override
- public int hashCode() {
- return chainVertexId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof LogAlgorithmMessageWritable) {
- LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
- return chainVertexId.equals(tp.chainVertexId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return chainVertexId.toString();
- }
-
- @Override
- public int compareTo(LogAlgorithmMessageWritable tp) {
- return chainVertexId.compareTo(tp.chainVertexId);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
new file mode 100644
index 0000000..31d313d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -0,0 +1,153 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class MessageWritable implements WritableComparable<MessageWritable> {
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private PositionWritable sourceVertexId;
+ private KmerBytesWritable chainVertexId;
+ private PositionListWritable neighberNode; //incoming or outgoing
+ private byte message;
+
+ private byte checkMessage;
+
+ public MessageWritable() {
+ sourceVertexId = new PositionWritable();
+ chainVertexId = new KmerBytesWritable(0);
+ neighberNode = new PositionListWritable();
+ message = Message.NON;
+ checkMessage = (byte) 0;
+ }
+
+ public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, PositionListWritable neighberNode, byte message) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ this.message = message;
+ }
+
+ public void reset() {
+ checkMessage = 0;
+ chainVertexId.reset(1);
+ neighberNode.reset();
+ message = Message.NON;
+ }
+
+ public PositionWritable getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(PositionWritable sourceVertexId) {
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ }
+
+ public KmerBytesWritable getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public void setChainVertexId(KmerBytesWritable chainVertexId) {
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ }
+
+ public PositionListWritable getNeighberNode() {
+ return neighberNode;
+ }
+
+ public void setNeighberNode(PositionListWritable neighberNode) {
+ if(neighberNode != null){
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ }
+
+ public int getLengthOfChain() {
+ return chainVertexId.getKmerLength();
+ }
+
+ public byte getMessage() {
+ return message;
+ }
+
+ public void setMessage(byte message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(checkMessage);
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.write(out);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.write(out);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.write(out);
+ out.write(message);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.reset();
+ checkMessage = in.readByte();
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.readFields(in);
+ message = in.readByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return sourceVertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof MessageWritable) {
+ MessageWritable tp = (MessageWritable) o;
+ return sourceVertexId.equals(tp.sourceVertexId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return sourceVertexId.toString();
+ }
+
+ @Override
+ public int compareTo(MessageWritable tp) {
+ return sourceVertexId.compareTo(tp.sourceVertexId);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
deleted file mode 100644
index c5378a2..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.type.CheckMessage;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-
-public class NaiveAlgorithmMessageWritable implements WritableComparable<NaiveAlgorithmMessageWritable> {
- /**
- * sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * file stores the point to the file that stores the chains of connected DNA
- */
- private KmerBytesWritable sourceVertexId;
- private byte adjMap;
- private byte lastGeneCode;
- private VKmerBytesWritable chainVertexId;
- private byte message;
-
- private byte checkMessage;
-
- public NaiveAlgorithmMessageWritable() {
- sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(1);
- adjMap = (byte) 0;
- lastGeneCode = (byte) -1;
- message = Message.NON;
- checkMessage = (byte) 0;
- }
-
- public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, VKmerBytesWritable chainVertexId, byte message) {
- checkMessage = 0;
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- if (lastGeneCode != 0) {
- checkMessage |= CheckMessage.LASTGENECODE;
- this.lastGeneCode = lastGeneCode;
- }
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- this.message = message;
- }
-
- public void reset() {
- checkMessage = 0;
- adjMap = (byte) 0;
- lastGeneCode = (byte) -1;
- chainVertexId.reset(1);
- message = Message.NON;
- }
-
- public KmerBytesWritable getSourceVertexId() {
- return sourceVertexId;
- }
-
- public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
- if (sourceVertexId != null) {
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- }
-
- public byte getAdjMap() {
- return adjMap;
- }
-
- public void setAdjMap(byte adjMap) {
- if (adjMap != 0) {
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- }
-
- public byte getLastGeneCode() {
- return lastGeneCode;
- }
-
- public void setLastGeneCode(byte lastGeneCode) {
- if (lastGeneCode != -1) {
- checkMessage |= CheckMessage.LASTGENECODE;
- this.lastGeneCode = lastGeneCode;
- }
- }
-
- public VKmerBytesWritable getChainVertexId() {
- return chainVertexId;
- }
-
- public void setChainVertexId(VKmerBytesWritable chainVertexId) {
- if (chainVertexId != null) {
- checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
- }
- }
-
- public int getLengthOfChain() {
- return chainVertexId.getKmerLength();
- }
-
- public byte getMessage() {
- return message;
- }
-
- public void setMessage(byte message) {
- this.message = message;
- }
-
- public boolean isGeneCode(){
- return ((checkMessage & CheckMessage.LASTGENECODE) != 0);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(checkMessage);
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.write(out);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- out.write(adjMap);
- if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
- out.write(lastGeneCode);
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.write(out);
- out.write(message);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.reset();
- checkMessage = in.readByte();
- if ((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.readFields(in);
- if ((checkMessage & CheckMessage.ADJMAP) != 0)
- adjMap = in.readByte();
- if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
- lastGeneCode = in.readByte();
- if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.readFields(in);
- message = in.readByte();
- }
-
- @Override
- public int hashCode() {
- return sourceVertexId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof NaiveAlgorithmMessageWritable) {
- NaiveAlgorithmMessageWritable tp = (NaiveAlgorithmMessageWritable) o;
- return sourceVertexId.equals(tp.sourceVertexId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return sourceVertexId.toString();
- }
-
- @Override
- public int compareTo(NaiveAlgorithmMessageWritable tp) {
- return sourceVertexId.compareTo(tp.sourceVertexId);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 9a9e30f..2554c8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -5,40 +5,50 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
- private byte adjMap;
+ private PositionListWritable incomingList;
+ private PositionListWritable outgoingList;
private byte state;
- private VKmerBytesWritable mergeChain;
+ private KmerBytesWritable mergeChain;
public ValueStateWritable() {
+ incomingList = new PositionListWritable();
+ outgoingList = new PositionListWritable();
state = State.NON_VERTEX;
- mergeChain = new VKmerBytesWritable(0);
- //isOp = false;
+ mergeChain = new KmerBytesWritable(0);
}
- public ValueStateWritable(byte adjMap, byte state, VKmerBytesWritable mergeChain) {
- this.adjMap = adjMap;
+ public ValueStateWritable(PositionListWritable incomingList, PositionListWritable outgoingList,
+ byte state, KmerBytesWritable mergeChain) {
+ set(incomingList, outgoingList, state, mergeChain);
+ }
+
+ public void set(PositionListWritable incomingList, PositionListWritable outgoingList,
+ byte state, KmerBytesWritable mergeChain) {
+ this.incomingList.set(incomingList);
+ this.outgoingList.set(outgoingList);
this.state = state;
this.mergeChain.set(mergeChain);
}
-
- public void set(byte adjMap, byte state, VKmerBytesWritable mergeChain) {
- this.adjMap = adjMap;
- this.state = state;
- this.mergeChain.set(mergeChain);
+
+ public PositionListWritable getIncomingList() {
+ return incomingList;
}
- public byte getAdjMap() {
- return adjMap;
+ public void setIncomingList(PositionListWritable incomingList) {
+ this.incomingList = incomingList;
}
- public void setAdjMap(byte adjMap) {
- this.adjMap = adjMap;
+ public PositionListWritable getOutgoingList() {
+ return outgoingList;
+ }
+
+ public void setOutgoingList(PositionListWritable outgoingList) {
+ this.outgoingList = outgoingList;
}
public byte getState() {
@@ -53,7 +63,7 @@
return mergeChain.getKmerLength();
}
- public VKmerBytesWritable getMergeChain() {
+ public KmerBytesWritable getMergeChain() {
return mergeChain;
}
@@ -61,20 +71,18 @@
this.mergeChain.set(mergeChain);
}
- public void setMergeChain(VKmerBytesWritable mergeChain) {
- this.mergeChain.set(mergeChain);
- }
-
@Override
public void readFields(DataInput in) throws IOException {
- adjMap = in.readByte();
+ incomingList.readFields(in);
+ outgoingList.readFields(in);
state = in.readByte();
mergeChain.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeByte(adjMap);
+ incomingList.write(out);
+ outgoingList.write(out);
out.writeByte(state);
mergeChain.write(out);
}
@@ -86,7 +94,14 @@
@Override
public String toString() {
- return GeneCode.getSymbolFromBitMap(adjMap) + "\t" + getLengthOfMergeChain() + "\t" + mergeChain.toString();
+ return state + "\t" + getLengthOfMergeChain() + "\t" + mergeChain.toString();
+ }
+
+ public int inDegree() {
+ return incomingList.getCountOfPosition();
}
+ public int outDegree() {
+ return outgoingList.getCountOfPosition();
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
index ae950f4..95e070f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -4,22 +4,19 @@
import java.util.logging.Handler;
import java.util.logging.LogRecord;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
public class DataLoadLogFormatter extends Formatter {
- private VKmerBytesWritable key;
- private KmerCountValue value;
+ private NodeWritable key;
- public void set(VKmerBytesWritable key, KmerCountValue value) {
+ public void set(NodeWritable key) {
this.key.set(key);
- this.value = value;
}
public String format(LogRecord record) {
StringBuilder builder = new StringBuilder(1000);
- builder.append(key.toString() + "\t" + value.toString() + "\r\n");
+ builder.append(key.toString() + "\r\n");
if (!formatMessage(record).equals(""))
builder.append(formatMessage(record) + "\r\n");
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 9eba176..dca2cb8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -2,22 +2,21 @@
import java.util.logging.*;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
public class LogAlgorithmLogFormatter extends Formatter {
//
// Create a DateFormat to format the logger timestamp.
//
- //private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private VKmerBytesWritable sourceVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
+ private KmerBytesWritable sourceVertexId = new KmerBytesWritable(1);
+ private KmerBytesWritable destVertexId = new KmerBytesWritable(1);
+ private MessageWritable msg = new MessageWritable();
private byte state;
- private VKmerBytesWritable mergeChain = new VKmerBytesWritable(1);;
+ private KmerBytesWritable mergeChain = new KmerBytesWritable(1);;
//private boolean testDelete = false;
/**
* 0: general operation
@@ -30,8 +29,8 @@
public LogAlgorithmLogFormatter() {
}
- public void set(long step, VKmerBytesWritable sourceVertexId, VKmerBytesWritable destVertexId,
- LogAlgorithmMessageWritable msg, byte state) {
+ public void set(long step, KmerBytesWritable sourceVertexId, KmerBytesWritable destVertexId,
+ MessageWritable msg, byte state) {
this.step = step;
this.sourceVertexId.set(sourceVertexId);
this.destVertexId.set(destVertexId);
@@ -40,7 +39,7 @@
this.operation = 0;
}
- public void setMergeChain(long step, VKmerBytesWritable sourceVertexId, VKmerBytesWritable mergeChain) {
+ public void setMergeChain(long step, KmerBytesWritable sourceVertexId, KmerBytesWritable mergeChain) {
this.reset();
this.step = step;
this.sourceVertexId.set(sourceVertexId);
@@ -48,7 +47,7 @@
this.operation = 2;
}
- public void setVotoToHalt(long step, VKmerBytesWritable sourceVertexId) {
+ public void setVotoToHalt(long step, KmerBytesWritable sourceVertexId) {
this.reset();
this.step = step;
this.sourceVertexId.set(sourceVertexId);
@@ -56,11 +55,11 @@
}
public void reset() {
- this.sourceVertexId = new VKmerBytesWritable(1);
- this.destVertexId = new VKmerBytesWritable(1);
- this.msg = new LogAlgorithmMessageWritable();
+ this.sourceVertexId = new KmerBytesWritable(1);
+ this.destVertexId = new KmerBytesWritable(1);
+ this.msg = new MessageWritable();
this.state = 0;
- this.mergeChain = new VKmerBytesWritable(1);
+ this.mergeChain = new KmerBytesWritable(1);
}
public String format(LogRecord record) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
index 39b0bc1..4a5850a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/NaiveAlgorithmLogFormatter.java
@@ -2,7 +2,7 @@
import java.util.logging.*;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
public class NaiveAlgorithmLogFormatter extends Formatter {
//
@@ -10,10 +10,10 @@
//
//private static final DateFormat df = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss.SSS");
private long step;
- private VKmerBytesWritable sourceVertexId;
- private VKmerBytesWritable destVertexId;
+ private KmerBytesWritable sourceVertexId;
+ private KmerBytesWritable destVertexId;
- public void set(long step, VKmerBytesWritable sourceVertexId, VKmerBytesWritable destVertexId) {
+ public void set(long step, KmerBytesWritable sourceVertexId, KmerBytesWritable destVertexId) {
this.step = step;
this.sourceVertexId.set(sourceVertexId);
this.destVertexId.set(destVertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
index b033c28..6be0eee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -9,21 +9,19 @@
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.PositionWritable;
/*
* vertexId: BytesWritable
* vertexValue: ValueStateWritable
* edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
+ * message: MessageWritable
*
* DNA:
* A: 00
@@ -48,19 +46,20 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LogAlgorithmForPathMergeVertex extends
- Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "LogAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
- private LogAlgorithmMessageWritable incomingMsg = new LogAlgorithmMessageWritable();
- private LogAlgorithmMessageWritable outgoingMsg = new LogAlgorithmMessageWritable();
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
+ private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
/**
* initiate kmerSize, maxIteration
*/
@@ -75,42 +74,35 @@
/**
* get destination vertex
*/
- public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ return posIterator.next();
}
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap) {
- return getDestVertexIdFromChain(chainVertexId, adjMap);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap) {
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+ public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ return posIterator.next();
}
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if ((adjMap & (1 << x)) != 0) {
- sendMsg(getNextDestVertexId(vertexId, x), outgoingMsg);
- }
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if (((adjMap >> 4) & (1 << x)) != 0) {
- sendMsg(getPreDestVertexId(vertexId, x), outgoingMsg);
- }
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
@@ -118,14 +110,14 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isHeadVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllNextNodes(getVertexValue());
voteToHalt();
}
- if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isRearVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllPreviousNodes(getVertexValue());
voteToHalt();
}
}
@@ -133,9 +125,9 @@
/**
* initiate head, rear and path node
*/
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void initState(Iterator<MessageWritable> msgIterator) {
while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
@@ -154,7 +146,7 @@
getVertexValue().setMergeChain(null);
} else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
- getVertexValue().setMergeChain(getVertexId());
+ getVertexValue().setMergeChain(getVertexValue().getMergeChain());
voteToHalt();
} else
voteToHalt();
@@ -163,29 +155,29 @@
/**
* head send message to path
*/
- public void sendOutMsg(KmerBytesWritable chainVertexId, byte adjMap) {
+ public void sendOutMsg() {
if (getVertexValue().getState() == State.START_VERTEX) {
outgoingMsg.setMessage(Message.START);
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
} else if (getVertexValue().getState() != State.END_VERTEX) {
outgoingMsg.setMessage(Message.NON);
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
}
}
/**
* head send message to path
*/
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3) {
- getVertexValue().setMergeChain(getVertexId());
- sendOutMsg(getVertexId(), getVertexValue().getAdjMap());
+ getVertexValue().setMergeChain(getVertexValue().getMergeChain());
+ sendOutMsg();
} else {
if (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if (mergeChainVertex(msgIterator)) {
+ if (mergeChainVertex()) {
if (incomingMsg.getMessage() == Message.END) {
if (getVertexValue().getState() == State.START_VERTEX) {
getVertexValue().setState(State.FINAL_VERTEX);
@@ -194,7 +186,7 @@
} else
getVertexValue().setState(State.END_VERTEX);
} else
- sendOutMsg(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
+ sendOutMsg();
}
}
}
@@ -203,11 +195,11 @@
/**
* path response message to head
*/
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
if (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setMessage(Message.END);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -223,28 +215,23 @@
/**
* merge chainVertex and store in vertexVal.chainVertexId
*/
- public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public boolean mergeChainVertex() {
//merge chain
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
incomingMsg.getChainVertexId()));
- chainVertexId.set(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer));
- if (VertexUtil.isCycle(getVertexId(), chainVertexId, kmerSize)) {
- getVertexValue().setMergeChain(null);
- getVertexValue().setAdjMap(
- VertexUtil.reverseAdjMap(getVertexValue().getAdjMap(),
- chainVertexId.getGeneCodeAtPosition(kmerSize)));
+ KmerBytesWritable chainVertexId = kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer);
+ getVertexValue().setMergeChain(chainVertexId);
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
+ if (VertexUtil.isCycle(kmerFactory.getFirstKmerFromChain(kmerSize, getVertexValue().getMergeChain()),
+ chainVertexId, kmerSize)) {
getVertexValue().setState(State.CYCLE);
return false;
- } else
- getVertexValue().setMergeChain(chainVertexId);
-
- byte tmpVertexValue = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(tmpVertexValue);
+ }
return true;
}
@Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1)
startSendMsg();
@@ -268,7 +255,7 @@
*/
job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index b637f84..af38072 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -3,17 +3,16 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
@@ -23,7 +22,7 @@
* vertexId: BytesWritable
* vertexValue: ByteWritable
* edgeValue: NullWritable
- * message: NaiveAlgorithmMessageWritable
+ * message: MessageWritable
*
* DNA:
* A: 00
@@ -51,18 +50,21 @@
* Naive Algorithm for path merge graph
*/
public class NaiveAlgorithmForPathMergeVertex extends
- Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
- private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
- private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
-
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
+
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
+ private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+
/**
* initiate kmerSize, maxIteration
*/
@@ -77,40 +79,35 @@
/**
* get destination vertex
*/
- public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ return posIterator.next();
}
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap) {
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+ public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ return posIterator.next();
}
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if ((adjMap & (1 << x)) != 0) {
- destVertexId.set(getDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if (((adjMap >> 4) & (1 << x)) != 0) {
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
@@ -118,22 +115,22 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isHeadVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isRearVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ sendMsgToAllPreviousNodes(getVertexValue());
}
}
/**
* initiate head, rear and path node
*/
- public void initState(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void initState(Iterator<MessageWritable> msgIterator) {
while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
@@ -155,33 +152,36 @@
} else
voteToHalt();
}
+
+ /**
+ * merge chainVertex and store in vertexVal.chainVertexId
+ */
+ public void mergeChainVertex() {
+ //merge chain
+ lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+ incomingMsg.getChainVertexId()));
+ getVertexValue().setMergeChain(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer));
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
+ }
/**
* head node sends message to path node
*/
- public void sendMsgToPathVertex(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3) {
- getVertexValue().setMergeChain(getVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if (incomingMsg.getMessage() != Message.STOP) {
- getVertexValue().setMergeChain(
- kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
+ mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId
- .set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), incomingMsg.getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
- getVertexValue().setMergeChain(
- kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
+ mergeChainVertex();
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
//System.out.println();
@@ -195,15 +195,16 @@
*/
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setMessage(Message.STOP);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ destVertexId.set(incomingMsg.getSourceVertexId());
+ sendMsg(destVertexId, outgoingMsg);
}
@Override
- public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
startSendMsg();
@@ -232,7 +233,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
index 44d47a0..6f4354a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
@@ -3,17 +3,16 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.State;
@@ -23,7 +22,7 @@
* vertexId: BytesWritable
* vertexValue: ByteWritable
* edgeValue: NullWritable
- * message: NaiveAlgorithmMessageWritable
+ * message: MessageWritable
*
* DNA:
* A: 00
@@ -51,7 +50,7 @@
* Naive Algorithm for path merge graph
*/
public class P3ForPathMergeVertex extends
- Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
@@ -61,13 +60,14 @@
public static float pseudoRate = -1;
public static int maxRound = -1;
- private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
- private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
+ private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
/**
* initiate kmerSize, maxIteration
*/
@@ -86,40 +86,35 @@
/**
* get destination vertex
*/
- public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ return posIterator.next();
}
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap) {
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+ public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ return posIterator.next();
}
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if ((adjMap & (1 << x)) != 0) {
- destVertexId.set(getDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getOutgoingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if (((adjMap >> 4) & (1 << x)) != 0) {
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getIncomingList().iterator();
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
}
}
@@ -127,25 +122,23 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isHeadVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
+ sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+ if (VertexUtil.isRearVertex(getVertexValue())) {
outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
+ sendMsgToAllPreviousNodes(getVertexValue());
}
}
-
+
/**
* initiate head, rear and path node
*/
- public void initState(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void initState(Iterator<MessageWritable> msgIterator) {
if (msgIterator.hasNext()) {
do {
- if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
@@ -167,7 +160,20 @@
voteToHalt();*/
}
}
-
+
+ /**
+ * set vertex state
+ */
+ public void setState() {
+ if (incomingMsg.getMessage() == Message.START) {
+ getVertexValue().setState(State.START_VERTEX);
+ } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ getVertexValue().setState(State.END_VERTEX);
+ voteToHalt();
+ } else
+ voteToHalt();
+ }
+
/**
* mark the pseudoHead
*/
@@ -175,8 +181,7 @@
getVertexValue().setState(State.PSEUDOHEAD);
outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
destVertexId
- .set(getPreDestVertexId(getVertexId(),
- GeneCode.getGeneCodeFromBitMap((byte) ((getVertexValue().getAdjMap() >> 4) & 0x0F))));
+ .set(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -194,45 +199,26 @@
getVertexValue().setState(State.START_HALT);
}
}
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
- getVertexValue().setState(State.START_VERTEX);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(State.END_VERTEX);
- voteToHalt();
- } else
- voteToHalt();
- }
-
+
/**
* merge chain vertex
*/
public void mergeChainVertex(){
- if(incomingMsg.isGeneCode() == true){
- getVertexValue().setMergeChain(
- kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
- }
- else{
- lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
- getVertexValue().setMergeChain(
- kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
- lastKmer));
- }
+ lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+ incomingMsg.getChainVertexId()));
+ getVertexValue().setMergeChain(
+ kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ lastKmer));
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
/**
* head node sends message to path node
*/
- public void sendMsgToPathVertexMergePhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertexMergePhase(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 3 + 2 * maxRound + 2) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
while (msgIterator.hasNext()) {
@@ -241,12 +227,10 @@
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId
- .set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), incomingMsg.getAdjMap()));
+ .set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
} else {
mergeChainVertex();
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
//System.out.println();
@@ -260,11 +244,8 @@
*/
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- if(getVertexValue().getLengthOfMergeChain() == 0)
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- else
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setMessage(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -273,12 +254,11 @@
/**
* head node sends message to path node in partition phase
*/
- public void sendMsgToPathVertexPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void sendMsgToPathVertexPartitionPhase(Iterator<MessageWritable> msgIterator) {
if (getSuperstep() == 4) {
- getVertexValue().setMergeChain(getVertexId());
if(getVertexValue().getState() != State.START_HALT){
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
}
@@ -288,13 +268,10 @@
//if from pseudoHead, voteToHalt(), otherwise ...
if (incomingMsg.getMessage() != Message.FROMPSEUDOHEAD){
mergeChainVertex();
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
if (incomingMsg.getMessage() != Message.STOP
&& incomingMsg.getMessage() != Message.FROMPSEUDOREAR) {
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
- incomingMsg.getAdjMap()));
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
voteToHalt();
} else {
@@ -321,11 +298,8 @@
outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
else {
deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- if(getVertexValue().getLengthOfMergeChain() == 0)
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- else
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setNeighberNode(incomingMsg.getNeighberNode());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
if (getVertexValue().getState() == State.PSEUDOREAR)
outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State.END_VERTEX)
@@ -338,12 +312,11 @@
/**
* final process the result of partition phase
*/
- public void finalProcessPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator){
+ public void finalProcessPartitionPhase(Iterator<MessageWritable> msgIterator){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
mergeChainVertex();
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
+ getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
//check head or pseudoHead
if (getVertexValue().getState() == State.START_VERTEX
&& incomingMsg.getMessage() == Message.STOP) {
@@ -367,7 +340,7 @@
}
@Override
- public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1)
startSendMsg();
@@ -411,7 +384,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
deleted file mode 100644
index 7a50537..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/CombineSequenceFile.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package edu.uci.ics.genomix.pregelix.sequencefile;
-
-import java.io.File;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-
-public class CombineSequenceFile {
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- // TODO Auto-generated method stub
- int kmerSize = 5;
- Configuration conf = new Configuration();
- FileSystem fileSys = FileSystem.get(conf);
-
- Path p = new Path("graphbuildresult/CyclePath2_result");
- //Path p2 = new Path("data/result");
- Path outFile = new Path("here");
- SequenceFile.Reader reader;
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE);
- KmerBytesWritable key = new KmerBytesWritable(kmerSize);
- KmerCountValue value = new KmerCountValue();
-
- File dir = new File("graphbuildresult/CyclePath2_result");
- for (File child : dir.listFiles()) {
- String name = child.getAbsolutePath();
- Path inFile = new Path(p, name);
- reader = new SequenceFile.Reader(fileSys, inFile, conf);
- while (reader.next(key, value)) {
- System.out.println(key.toString() + "\t" + value.toString());
- writer.append(key, value);
- }
- reader.close();
- }
- writer.close();
- System.out.println();
-
- reader = new SequenceFile.Reader(fileSys, outFile, conf);
- while (reader.next(key, value)) {
- System.err.println(key.toString() + "\t" + value.toString());
- }
- reader.close();
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index bb288ff..6b9eb4e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -12,11 +12,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
public class GenerateSmallFile {
@@ -27,15 +27,14 @@
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE);
+ NullWritable.class, CompressionType.NONE);
KmerBytesWritable outKey = new KmerBytesWritable(55);
- KmerCountValue outValue = new KmerCountValue();
int i = 0;
for (i = 0; i < numOfLines; i++) {
// System.out.println(i);
- reader.next(outKey, outValue);
- writer.append(outKey, outValue);
+ reader.next(outKey, null);
+ writer.append(outKey, null);
}
writer.close();
reader.close();
@@ -47,18 +46,6 @@
writeTextFile(outFile, lines);
}
- public static void main(String[] args) throws IOException {
- Path dir = new Path("data/split.aa");
- Path outDir = new Path("data/input");
- FileUtils.cleanDirectory(new File("data/input"));
- Path inFile = new Path(dir, "part-0");
- Path outFile = new Path(outDir, "part-0-out-1000");
- generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 1000);
- /* String inFile = "data/shortjump_1.head8M.fastq";
- String outFile = "data/testGeneFile";
- generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);*/
- }
-
public static String readTextFile(String fileName, int numOfLines) {
String returnValue = "";
FileReader file;
@@ -93,6 +80,17 @@
} catch (IOException e) {
e.printStackTrace();
}
-
+ }
+
+ public static void main(String[] args) throws IOException {
+ Path dir = new Path("data/split.aa");
+ Path outDir = new Path("data/input");
+ FileUtils.cleanDirectory(new File("data/input"));
+ Path inFile = new Path(dir, "part-0");
+ Path outFile = new Path(outDir, "part-0-out-1000");
+ generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 1000);
+ /* String inFile = "data/shortjump_1.head8M.fastq";
+ String outFile = "data/testGeneFile";
+ generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);*/
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 517b9c3..45609c6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -15,7 +15,6 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
public class GenerateTextFile {
@@ -99,10 +98,9 @@
Path path = new Path("data/input/part-0-out-3000000");
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
KmerBytesWritable key = new KmerBytesWritable(55);
- KmerCountValue value = new KmerCountValue();
- while (reader.next(key, value)) {
- if (key == null || value == null) {
+ while (reader.next(key, null)) {
+ if (key == null) {
break;
}
bw.write(key.toString());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 61d2256..6e6a97a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -4,7 +4,7 @@
public static final byte SOURCE = 1 << 0;
public static final byte CHAIN = 1 << 1;
- public static final byte ADJMAP = 1 << 2;
+ public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
public static final byte STATE = 1 << 4;
public static final byte LASTGENECODE = 1 << 5;
@@ -20,8 +20,8 @@
case CHAIN:
r = "CHAIN";
break;
- case ADJMAP:
- r = "ADJMAP";
+ case NEIGHBER:
+ r = "NEIGHBER";
break;
case MESSAGE:
r = "MESSAGE";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 50ff400..f2a61be 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,19 +1,16 @@
package edu.uci.ics.genomix.pregelix.util;
-import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class VertexUtil {
- public static VKmerBytesWritable subKmer = new VKmerBytesWritable(0);
-
/**
* Single Vertex: in-degree = out-degree = 1
*
* @param vertexValue
*/
- public static boolean isPathVertex(byte value) {
- if (GeneCode.inDegree(value) == 1 && GeneCode.outDegree(value) == 1)
+ public static boolean isPathVertex(ValueStateWritable value) {
+ if (value.inDegree() == 1 && value.outDegree() == 1)
return true;
return false;
}
@@ -23,8 +20,8 @@
*
* @param vertexValue
*/
- public static boolean isHeadVertex(byte value) {
- if (GeneCode.outDegree(value) > 0 && !isPathVertex(value))
+ public static boolean isHeadVertex(ValueStateWritable value) {
+ if (value.outDegree() > 0 && !isPathVertex(value))
return true;
return false;
}
@@ -34,35 +31,18 @@
*
* @param vertexValue
*/
- public static boolean isRearVertex(byte value) {
- if (GeneCode.inDegree(value) > 0 && !isPathVertex(value))
+ public static boolean isRearVertex(ValueStateWritable value) {
+ if (value.inDegree() > 0 && !isPathVertex(value))
return true;
return false;
}
/**
- * update right neighber based on next vertexId
- */
- public static byte updateRightNeighberByVertexId(byte oldVertexValue, KmerBytesWritable neighberVertex, int k) {
- byte geneCode = neighberVertex.getGeneCodeAtPosition(k - 1);
-
- byte newBit = GeneCode.getBitMapFromGeneCode(geneCode); //getAdjBit
- return (byte) ((byte) (oldVertexValue & 0xF0) | (byte) (newBit & 0x0F));
- }
-
- /**
- * update right neighber
- */
- public static byte updateRightNeighber(byte oldVertexValue, byte newVertexValue) {
- return (byte) ((byte) (oldVertexValue & 0xF0) | (byte) (newVertexValue & 0x0F));
- }
-
- /**
* check if mergeChain is cycle
*/
- public static boolean isCycle(KmerBytesWritable vertexId, VKmerBytesWritable mergeChain, int kmerSize) {
+ public static boolean isCycle(KmerBytesWritable kmer, KmerBytesWritable mergeChain, int kmerSize) {
String chain = mergeChain.toString().substring(1);
- if (chain.contains(vertexId.toString()))
+ if (chain.contains(kmer.toString()))
return true;
return false;
@@ -75,11 +55,4 @@
}
return false;*/
}
-
- /**
- * reverse neighber
- */
- public static byte reverseAdjMap(byte oldAdjMap, byte geneCode) {
- return (byte) ((oldAdjMap & 0xF0) | (GeneCode.getBitMapFromGeneCode(geneCode) & 0x0F));
- }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 809ea34..f1fcdac 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -4,15 +4,15 @@
import java.io.FileOutputStream;
import java.io.IOException;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
public class JobGenerator {
@@ -25,7 +25,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -42,7 +42,7 @@
job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
@@ -58,7 +58,7 @@
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 5);
job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.4f);
@@ -72,8 +72,8 @@
}
public static void main(String[] args) throws IOException {
- //genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
+ genNaiveAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
genP3ForMergeGraph();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
deleted file mode 100644
index 66ee26d..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-
-@SuppressWarnings("deprecation")
-public class GraphBuildTest {
- private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-
- private static final String DATA_PATH = "data/testGeneFile";
- private static final String HDFS_INPUT_PATH = "/test";
- private static final String HDFS_OUTPUT_PATH = "/result";
-
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/result.txt";
- private static final String CONVERT_RESULT = ACTUAL_RESULT_DIR + "/graph_build_result.txt";
- private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
-
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private Driver driver;
-
- @Before
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
-
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-
- conf.setInt(GenomixJob.KMER_LENGTH, 55);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
-
- @Test
- public void TestAll() throws Exception {
- cleanUpReEntry();
- TestPreClusterGroupby();
- }
-
- public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- //conf.set(GenomixJob.OUTPUT_FORMAT, "text");
- System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- private boolean checkResults(String expectedPath) throws Exception {
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
- if ("text".equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- } else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH + partname),
- FileSystem.getLocal(new Configuration()), new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH
- + partname), false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- }
-
- // TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
-
- @After
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java
deleted file mode 100644
index daa7e39..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestCase.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-
-public class GraphBuildTestCase extends TestCase {
- private final JobConf conf;
- private Driver driver;
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
- private static final String HDFS_OUTPUT_PATH = "/result";
- private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/result.txt";
- private static final String CONVERT_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/result.txt.txt";
-
- public GraphBuildTestCase(JobConf conf, Driver driver) {
- this.conf = conf;
- this.driver = driver;
- }
-
- private void cleanUpReEntry() throws IOException {
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- if (lfs.exists(new Path(DUMPED_RESULT))) {
- lfs.delete(new Path(DUMPED_RESULT), true);
- }
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
- dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
- }
- }
-
- @Test
- public void Test() throws Exception {
- cleanUpReEntry();
- TestPreClusterGroupby();
- }
-
- public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults());
- }
-
- private boolean checkResults() throws Exception {
- File dumped = null;
- String format = conf.get(GenomixJob.OUTPUT_FORMAT);
- if ("text".equalsIgnoreCase(format)) {
- FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
- FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH + partname),
- FileSystem.getLocal(new Configuration()), new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH
- + partname), false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
- while (reader.next(key, value)) {
- if (key == null || value == null) {
- break;
- }
- bw.write(key.toString() + "\t" + value.toString());
- System.out.println(key.toString() + "\t" + value.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
-
- // TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- return true;
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestSuite.java
deleted file mode 100644
index fdc3785..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTestSuite.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import junit.framework.Assert;
-import junit.framework.Test;
-import junit.framework.TestResult;
-import junit.framework.TestSuite;
-
-public class GraphBuildTestSuite extends TestSuite {
- private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-
- private static final String DATA_PATH = "graph/7/TreePath";
- private static final String HDFS_INPUT_PATH = "/test";
- private static final String HDFS_OUTPUT_PATH = "/result";
-
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private static JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine = 1;
-
- private static Driver driver;
-
- public void setUp() throws Exception {
- cleanupStores();
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
-
- FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
- FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
-
- conf.setInt(GenomixJob.KMER_LENGTH, 7);
- driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- public static Test suite() throws Exception {
- GraphBuildTestSuite testSuite = new GraphBuildTestSuite();
- testSuite.setUp();
- testSuite.addTest(new GraphBuildTestCase(conf, driver));
- return testSuite;
- }
-
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- public void tearDown() throws Exception {
- edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
- cleanupHDFS();
- }
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
index 68c186a..8c02547 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
@@ -14,7 +14,6 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
public class MergePathTest {
@@ -98,10 +97,10 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() <= maxLength && value.getLengthOfMergeChain() != kmerSize) {
+ /*if (value.getLengthOfMergeChain() <= maxLength && value.getLengthOfMergeChain() != kmerSize) {
bw.write(value.getLengthOfMergeChain() + "\t" + value.getMergeChain().toString());
bw.newLine();
- }
+ }*/
}
reader.close();
}
@@ -123,10 +122,10 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() <= maxLength && value.getState() == State.FINAL_VERTEX) {
+ /* if (value.getLengthOfMergeChain() <= maxLength && value.getState() == State.FINAL_VERTEX) {
bw.write(value.getLengthOfMergeChain() + "\t" + value.getMergeChain().toString());
bw.newLine();
- }
+ }*/
}
reader.close();
}