Merge branch 'anbangx/fullstack_genomix' of https://code.google.com/p/hyracks into anbangx/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 416ab49..be65d55 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -15,6 +15,8 @@
package edu.uci.ics.genomix.type;
+import javax.management.RuntimeErrorException;
+
public class GeneCode {
public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
/**
@@ -51,7 +53,7 @@
public static byte getPairedGeneCode(byte genecode){
if ( genecode < 0 || genecode > 3){
- throw new IllegalArgumentException("Invalid genecode");
+ throw new IllegalArgumentException("Invalid genecode: " + genecode);
}
return (byte) (3- genecode);
}
@@ -66,4 +68,30 @@
}
return GENE_SYMBOL[code];
}
+
+ public static String reverseComplement(String kmer) {
+ StringBuilder sb = new StringBuilder();
+ for (char letter : kmer.toCharArray()) {
+ sb.append(complement(letter));
+ }
+ return sb.reverse().toString();
+ }
+
+ public static char complement(char ch) {
+ switch (ch) {
+ case 'A':
+ case 'a':
+ return 'T';
+ case 'C':
+ case 'c':
+ return 'G';
+ case 'G':
+ case 'g':
+ return 'C';
+ case 'T':
+ case 't':
+ return 'A';
+ }
+ throw new RuntimeException("Invalid character given in complement: " + ch);
+ }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 50baeb4..17f1a11 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -50,7 +50,7 @@
public KmerBytesWritable(int k, byte[] storage, int offset) {
setNewReference(k, storage, offset);
}
-
+
public KmerBytesWritable(int k, String kmer) {
setNewReference(kmer.length(), kmer.getBytes(), 0);
}
@@ -198,11 +198,16 @@
if (pos >= kmerlength) {
throw new IllegalArgumentException("gene position out of bound");
}
+ return geneCodeAtPosition(pos);
+ }
+
+ // unchecked version of above. Used when kmerlength is inaccurate (mid-merge)
+ private byte geneCodeAtPosition(int pos) {
int posByte = pos / 4;
int shift = (pos % 4) << 1;
return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
}
-
+
public int getKmerLength() {
return this.kmerlength;
}
@@ -267,7 +272,8 @@
byte l = 0;
int bytecount = 0;
int bcount = size - 1;
- for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
+// for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
+ for (int i = start + kmerlength - 1; i >= start && i < array.length; i--) {
byte code = GeneCode.getPairedCodeFromSymbol(array[i]);
l |= (byte) (code << bytecount);
bytecount += 2;
@@ -358,7 +364,7 @@
* @param kmer
* : the next kmer
*/
- public void mergeNextKmer(int initialKmerSize, KmerBytesWritable kmer) {
+ public void mergeWithFFKmer(int initialKmerSize, KmerBytesWritable kmer) {
int preKmerLength = kmerlength;
int preSize = size;
this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
@@ -374,6 +380,97 @@
}
/**
+ * Merge Kmer with the next connected Kmer, when that Kmer needs to be reverse-complemented
+ * e.g. AAGCTAA merge with GGTTGTT, if the initial kmerSize = 3
+ * then it will return AAGCTAACAACC
+ *
+ * A merge B => A B~
+ *
+ * @param initialKmerSize
+ * : the initial kmerSize
+ * @param kmer
+ * : the next kmer
+ */
+ public void mergeWithFRKmer(int initialKmerSize, KmerBytesWritable kmer) {
+ int preSize = size;
+ int preKmerLength = kmerlength;
+ this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ // copy prefix into right-side of buffer
+ for (int i = 1; i <= preSize; i++) {
+ bytes[offset + size - i] = bytes[offset + preSize - i];
+ }
+
+ int bytecount = (preKmerLength % 4) * 2;
+ int bcount = size - preSize - bytecount / 8; // may overlap previous kmer
+ byte l = bcount == size - preSize ? bytes[offset + bcount] : 0x00;
+ bytecount %= 8;
+ for (int i = kmer.kmerlength - initialKmerSize; i >= 0; i--) {
+ byte code = GeneCode.getPairedGeneCode(kmer.getGeneCodeAtPosition(i));
+ l |= (byte) (code << bytecount);
+ bytecount += 2;
+ if (bytecount == 8) {
+ bytes[offset + bcount--] = l;
+ l = 0;
+ bytecount = 0;
+ }
+ }
+ if (bcount >= 0) {
+ bytes[offset] = l;
+ }
+ }
+
+ /**
+ * Merge Kmer with the previous connected Kmer, when that kmer needs to be reverse-complemented
+ * e.g. AACAACC merge with TTCTGCC, if the initial kmerSize = 3
+ * then it will return GGCAGAACAACC
+ *
+ * @param initialKmerSize
+ * : the initial kmerSize
+ * @param preKmer
+ * : the previous kmer
+ */
+ public void mergeWithRFKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+ int preKmerLength = kmerlength;
+ int preSize = size;
+ this.kmerlength += preKmer.kmerlength - initialKmerSize + 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ // byte cacheByte = getOneByteFromKmerAtPosition(0, bytes, offset, preSize);
+
+ int byteIndex = size - 1;
+ byte cacheByte = 0x00;
+ int posnInByte = 0;
+
+ // copy rc of preKmer into high bytes
+ for (int i = preKmer.kmerlength - 1; i >= initialKmerSize - 1; i--) {
+ byte code = GeneCode.getPairedGeneCode(preKmer.getGeneCodeAtPosition(i));
+ cacheByte |= (byte) (code << posnInByte);
+ posnInByte += 2;
+ if (posnInByte == 8) {
+ bytes[byteIndex--] = cacheByte;
+ cacheByte = 0;
+ posnInByte = 0;
+ }
+ }
+
+ // copy my kmer into low positions of bytes
+ for (int i = 0; i < preKmerLength; i++) {
+ // expanding the capacity makes this offset incorrect. It's off by the # of additional bytes added.
+ int newposn = i + (size - preSize) * 4;
+ byte code = geneCodeAtPosition(newposn);
+ cacheByte |= (byte) (code << posnInByte);
+ posnInByte += 2;
+ if (posnInByte == 8) {
+ bytes[byteIndex--] = cacheByte;
+ cacheByte = 0;
+ posnInByte = 0;
+ }
+ }
+ bytes[offset] = cacheByte;
+ clearLeadBit();
+ }
+
+ /**
* Merge Kmer with the previous connected Kmer
* e.g. AACAACC merge with AAGCTAA, if the initial kmerSize = 3
* then it will return AAGCTAACAACC
@@ -383,7 +480,7 @@
* @param preKmer
* : the previous kmer
*/
- public void mergePreKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+ public void mergeWithRRKmer(int initialKmerSize, KmerBytesWritable preKmer) {
int preKmerLength = kmerlength;
int preSize = size;
this.kmerlength += preKmer.kmerlength - initialKmerSize + 1;
@@ -417,7 +514,7 @@
buffer[position] = (byte) ((buffer[position] & mask) | ((0xff & onebyte) << shift));
if (position > start && shift != 0) {
- buffer[position - 1] = (byte) ((buffer[position - 1] & (0xff - mask)) | ((byte) ((0xff & onebyte) >> (8 - shift))));
+ buffer[position - 1] = (byte) ((buffer[position - 1] & (0xff - mask)) | ((byte) ((0xff & onebyte) >>> (8 - shift))));
}
}
@@ -427,7 +524,7 @@
throw new IllegalArgumentException("Buffer of kmer storage is invalid");
}
int shift = (k % 4) << 1;
- byte data = (byte) (((0xff) & buffer[position]) >> shift);
+ byte data = (byte) (((0xff) & buffer[position]) >>> shift);
if (shift != 0 && position > start) {
data |= 0xff & (buffer[position - 1] << (8 - shift));
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 54abda3..2c64139 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -27,6 +27,17 @@
*
*/
private static final long serialVersionUID = 1L;
+ public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
+
+ // merge/update directions
+ public static class DirectionFlag {
+ public static final byte DIR_FF = 0b00 << 0;
+ public static final byte DIR_FR = 0b01 << 0;
+ public static final byte DIR_RF = 0b10 << 0;
+ public static final byte DIR_RR = 0b11 << 0;
+ public static final byte DIR_MASK = 0b11 << 0;
+ }
+
private PositionWritable nodeID;
private PositionListWritable forwardForwardList;
private PositionListWritable forwardReverseList;
@@ -57,9 +68,9 @@
reverseReverseList.set(RRList);
kmer.set(kmer);
}
-
+
public void set(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
- PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer){
+ PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
this.nodeID.set(nodeID);
this.forwardForwardList.set(FFList);
this.forwardReverseList.set(FRList);
@@ -105,6 +116,21 @@
return reverseReverseList;
}
+ public PositionListWritable getListFromDir(byte dir) {
+ switch (dir & DirectionFlag.DIR_MASK) {
+ case DirectionFlag.DIR_FF:
+ return getFFList();
+ case DirectionFlag.DIR_FR:
+ return getFRList();
+ case DirectionFlag.DIR_RF:
+ return getRFList();
+ case DirectionFlag.DIR_RR:
+ return getRRList();
+ default:
+ throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
+ }
+ }
+
public PositionWritable getNodeID() {
return nodeID;
}
@@ -120,13 +146,13 @@
public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
this.forwardForwardList.set(nextNode.forwardForwardList);
this.forwardReverseList.set(nextNode.forwardReverseList);
- kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
+ kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
}
public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
this.reverseForwardList.set(preNode.reverseForwardList);
this.reverseReverseList.set(preNode.reverseReverseList);
- kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
+ kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
}
public void set(NodeWritable node) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index ffd2805..c793609 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -131,6 +131,20 @@
};
return it;
}
+
+ /*
+ * remove the first instance of @toRemove. Uses a linear scan. Throws an exception if not in this list.
+ */
+ public void remove(PositionWritable toRemove) {
+ Iterator<PositionWritable> posIterator = this.iterator();
+ while (posIterator.hasNext()) {
+ if(toRemove.equals(posIterator.next())) {
+ posIterator.remove();
+ return;
+ }
+ }
+ throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
+ }
public void set(PositionListWritable list2) {
set(list2.valueCount, list2.storage, list2.offset);
@@ -162,6 +176,20 @@
valueCount += 1;
}
+ /*
+ * Append the otherList to the end of myList
+ */
+ public void appendList(PositionListWritable otherList) {
+ if (otherList.valueCount > 0) {
+ setSize((valueCount + otherList.valueCount) * PositionWritable.LENGTH);
+ // copy contents of otherList into the end of my storage
+ System.arraycopy(otherList.storage, otherList.offset,
+ storage, offset + valueCount * PositionWritable.LENGTH,
+ otherList.valueCount * PositionWritable.LENGTH);
+ valueCount += otherList.valueCount;
+ }
+ }
+
public static int getCountByDataLength(int length) {
if (length % PositionWritable.LENGTH != 0) {
for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 5a59a87..25353f0 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -15,6 +15,10 @@
package edu.uci.ics.genomix.data.test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
import junit.framework.Assert;
import org.junit.Test;
@@ -115,7 +119,7 @@
}
@Test
- public void TestMergeNextKmer() {
+ public void TestMergeFFKmer() {
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String text = "AGCTGACCGT";
KmerBytesWritable kmer1 = new KmerBytesWritable(8);
@@ -127,12 +131,12 @@
Assert.assertEquals(text2, kmer2.toString());
KmerBytesWritable merge = new KmerBytesWritable(kmer1);
int kmerSize = 8;
- merge.mergeNextKmer(kmerSize, kmer2);
+ merge.mergeWithFFKmer(kmerSize, kmer2);
Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
for (int i = 1; i < 8; i++) {
merge.set(kmer1);
- merge.mergeNextKmer(i, kmer2);
+ merge.mergeWithFFKmer(i, kmer2);
Assert.assertEquals(text1 + text2.substring(i - 1), merge.toString());
}
@@ -148,15 +152,92 @@
Assert.assertEquals(text2, kmer2.toString());
for (int x = 1; x < jk; x++) {
merge.set(kmer1);
- merge.mergeNextKmer(x, kmer2);
+ merge.mergeWithFFKmer(x, kmer2);
Assert.assertEquals(text1 + text2.substring(x - 1), merge.toString());
}
}
}
}
+
+ @Test
+ public void TestMergeFRKmer() {
+ int kmerSize = 3;
+ String result = "AAGCTAACAACC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AAGCTAA";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 0);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "GGTTGTT";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, result.length() - text2.length());
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithFRKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAAACAACC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAAACAACC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithFRKmer(i, kmer2);
+ Assert.assertEquals("AAGCTAACAACC", merge.toString());
+ }
+
+
+ @Test
+ public void TestMergeRFKmer() {
+ int kmerSize = 3;
+ String result = "GGCACAACAACCC";
+ byte[] resultArray = result.getBytes();
+
+ String text1 = "AACAACCC";
+ KmerBytesWritable kmer1 = new KmerBytesWritable(text1.length());
+ kmer1.setByRead(resultArray, 5);
+ Assert.assertEquals(text1, kmer1.toString());
+
+ // kmer2 is the rc of the end of the read
+ String text2 = "TTGTGCC";
+ KmerBytesWritable kmer2 = new KmerBytesWritable(text2.length());
+ kmer2.setByReadReverse(resultArray, 0);
+ Assert.assertEquals(text2, kmer2.toString());
+
+ KmerBytesWritable merge = new KmerBytesWritable(kmer1);
+ merge.mergeWithRFKmer(kmerSize, kmer2);
+ Assert.assertEquals(result, merge.toString());
+
+ int i = 1;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAAACAACCC", merge.toString());
+
+ i = 2;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAAACAACCC", merge.toString());
+
+ i = 3;
+ merge.set(kmer1);
+ merge.mergeWithRFKmer(i, kmer2);
+ Assert.assertEquals("GGCACAACAACCC", merge.toString());
+ }
+
+
@Test
- public void TestMergePreKmer() {
+ public void TestMergeRRKmer() {
byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
String text = "AGCTGACCGT";
KmerBytesWritable kmer1 = new KmerBytesWritable(8);
@@ -168,12 +249,12 @@
Assert.assertEquals(text2, kmer2.toString());
KmerBytesWritable merge = new KmerBytesWritable(kmer2);
int kmerSize = 8;
- merge.mergePreKmer(kmerSize, kmer1);
+ merge.mergeWithRRKmer(kmerSize, kmer1);
Assert.assertEquals(text1 + text2.substring(kmerSize - 1), merge.toString());
for (int i = 1; i < 8; i++) {
merge.set(kmer2);
- merge.mergePreKmer(i, kmer1);
+ merge.mergeWithRRKmer(i, kmer1);
Assert.assertEquals(text1.substring(0, text1.length() - i + 1) + text2, merge.toString());
}
@@ -189,7 +270,7 @@
Assert.assertEquals(text2, kmer2.toString());
for (int x = 1; x < ik; x++) {
merge.set(kmer2);
- merge.mergePreKmer(x, kmer1);
+ merge.mergeWithRRKmer(x, kmer1);
Assert.assertEquals(text1.substring(0, text1.length() - x + 1) + text2, merge.toString());
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index c2b0e52..a7ca739 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -23,7 +23,8 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeFlag;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -32,28 +33,10 @@
/*
* Flags used when sending messages
*/
- public static class MessageFlag {
- public static final byte EMPTY_MESSAGE = 0;
- public static final byte FROM_SELF = 1;
- public static final byte FROM_SUCCESSOR = 1 << 1;
- public static final byte FROM_PREDECESSOR = 1 << 2;
- public static final byte IS_HEAD = 1 << 3;
- public static final byte IS_TAIL = 1 << 4;
- public static final byte IS_PSEUDOHEAD = 1 << 5;
- public static final byte IS_COMPLETE = 1 << 6;
-
- public static String getFlagAsString(byte code) {
- // TODO: allow multiple flags to be set
- switch (code) {
- case EMPTY_MESSAGE:
- return "EMPTY_MESSAGE";
- case FROM_SELF:
- return "FROM_SELF";
- case FROM_SUCCESSOR:
- return "FROM_SUCCESSOR";
- }
- return "ERROR_BAD_MESSAGE";
- }
+ public static class MergeMessageFlag extends PathNodeFlag {
+ public static final byte FROM_SUCCESSOR = 1 << 5;
+ public static final byte FROM_PREDECESSOR = 1 << 6;
+ public static final byte IS_PSEUDOHEAD = ((byte) 1 << 6); //TODO FIXME
}
/*
@@ -61,14 +44,14 @@
* Heads send themselves to their successors, and all others map themselves.
*/
private static class MergePathsH3Mapper extends MapReduceBase implements
- Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private static long randSeed;
private Random randGenerator;
private float probBeingRandomHead;
private int KMER_SIZE;
private PositionWritable outputKey;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private byte headFlag;
private byte outFlag;
@@ -81,7 +64,7 @@
finalMerge = conf.getBoolean("finalMerge", false);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
outputKey = new PositionWritable();
curNode = new NodeWritable(KMER_SIZE);
}
@@ -93,34 +76,34 @@
}
@Override
- public void map(PositionWritable key, MessageWritableNodeWithFlag value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
curNode = value.getNode();
// Map all path vertices; Heads and pseudoheads are sent to their successors
// NOTE: all mapping nodes are already simple paths
// Node may be marked as head b/c it's a real head, it's a previously merged head, or the node appears as a random head
- headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+ headFlag = (byte) (MergeMessageFlag.IS_HEAD & value.getFlag());
// remove all pseudoheads on the last iteration
if (!finalMerge) {
- headFlag |= (MessageFlag.IS_PSEUDOHEAD & value.getFlag());
+ headFlag |= (MergeMessageFlag.IS_PSEUDOHEAD & value.getFlag());
}
- outFlag = (byte) (headFlag | (MessageFlag.IS_TAIL & value.getFlag()));
+ outFlag = (byte) (headFlag | (MergeMessageFlag.IS_TAIL & value.getFlag()));
if (headFlag != 0 || isNodeRandomHead(curNode.getNodeID())) {
// head nodes send themselves to their successor
//outputKey.set(curNode.getOutgoingList().getPosition(0));
if (!finalMerge) {
- headFlag |= (MessageFlag.IS_PSEUDOHEAD & value.getFlag());
+ headFlag |= (MergeMessageFlag.IS_PSEUDOHEAD & value.getFlag());
}
- outFlag |= MessageFlag.FROM_PREDECESSOR;
+ outFlag |= MergeMessageFlag.FROM_PREDECESSOR;
outputValue.set(outFlag, curNode);
output.collect(outputKey, outputValue);
} else {
// tail nodes map themselves
- outFlag |= MessageFlag.FROM_SELF;
+ outFlag |= MergeMessageFlag.MSG_SELF;
outputValue.set(outFlag, curNode);
output.collect(key, outputValue);
}
@@ -131,11 +114,11 @@
* Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
*/
private static class MergePathsH3Reducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable headNode;
private NodeWritable tailNode;
private int count;
@@ -143,20 +126,20 @@
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
headNode = new NodeWritable(KMER_SIZE);
tailNode = new NodeWritable(KMER_SIZE);
}
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
inputValue = values.next();
if (!values.hasNext()) {
// all single nodes must be remapped
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) == MergeMessageFlag.MSG_SELF) {
// FROM_SELF => remap self
output.collect(key, inputValue);
} else {
@@ -166,11 +149,11 @@
} else {
// multiple inputs => a merge will take place. Aggregate both, then collect the merged path
count = 0;
- outFlag = MessageFlag.EMPTY_MESSAGE;
+ outFlag = MergeMessageFlag.EMPTY_MESSAGE;
while (true) { // process values; break when no more
count++;
- outFlag |= (inputValue.getFlag() & (MessageFlag.IS_HEAD | MessageFlag.IS_PSEUDOHEAD | MessageFlag.IS_TAIL));
- if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+ outFlag |= (inputValue.getFlag() & (MergeMessageFlag.IS_HEAD | MergeMessageFlag.IS_PSEUDOHEAD | MergeMessageFlag.IS_TAIL));
+ if ((inputValue.getFlag() & MergeMessageFlag.FROM_PREDECESSOR) == MergeMessageFlag.FROM_PREDECESSOR) {
headNode.set(inputValue.getNode());
} else {
tailNode.set(inputValue.getNode());
@@ -188,12 +171,12 @@
//headNode.mergeNext(tailNode, KMER_SIZE);
outputValue.set(outFlag, headNode);
- if ((outFlag & MessageFlag.IS_TAIL) == MessageFlag.IS_TAIL) {
+ if ((outFlag & MergeMessageFlag.IS_TAIL) == MergeMessageFlag.IS_TAIL) {
// Pseudoheads merging with tails don't become heads.
// Reset the IS_PSEUDOHEAD flag
- outFlag &= ~MessageFlag.IS_PSEUDOHEAD;
+ outFlag &= ~MergeMessageFlag.IS_PSEUDOHEAD;
- if ((outFlag & MessageFlag.IS_HEAD) == MessageFlag.IS_HEAD) {
+ if ((outFlag & MergeMessageFlag.IS_HEAD) == MergeMessageFlag.IS_HEAD) {
// True heads meeting tails => merge is complete for this node
// TODO: send to the "complete" collector
}
@@ -219,9 +202,9 @@
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
conf.setMapperClass(MergePathsH3Mapper.class);
conf.setReducerClass(MergePathsH3Reducer.class);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index b240833..1fef016 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -1,6 +1,23 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
+import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
@@ -21,30 +38,55 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.oldtype.VKmerBytesWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathMultiSeqOutputFormat;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial.PathNodeInitialReducer;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+/*
+ * a probabilistic merge algorithm for merging long single paths (chains without only 1 incoming and outgoing edge)
+ * The merge is guaranteed to succeed, but not all nodes that could be merged in an iteration will be.
+ *
+ * There are two steps to the merge:
+ * 1. (H4UpdatesMapper & H4UpdatesReducer): the direction of the merge is chosen and all
+ * neighbor's edges are updated with the merge intent
+ * 2. H4MergeMapper & H4MergeReducer): the nodes initiating the merge are "sent" to their neighbors, kmers are combined, and edges
+ * are again updated (since the merge-initiator may be neighbor to another merging node).
+ */
@SuppressWarnings("deprecation")
public class MergePathsH4 extends Configured implements Tool {
+ private enum MergeDir {
+ NO_MERGE,
+ FORWARD,
+ BACKWARD
+
+ }
+
/*
- * Mapper class: Partition the graph using random pseudoheads.
- * Heads send themselves to their successors, and all others map themselves.
+ * Mapper class: randomly chooses a direction to merge s.t. if a merge takes place, it will be successful.
+ * Sends update messages to all of this node's neighbors who their new neighbor will be
*/
- public static class MergePathsH4Mapper extends MapReduceBase implements
- Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ public static class H4UpdatesMapper extends MapReduceBase implements
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private static long randSeed;
private Random randGenerator;
private float probBeingRandomHead;
private int KMER_SIZE;
- private PositionWritable outputKey;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
+ private NodeWithFlagWritable mergeMsgValue;
+ private NodeWithFlagWritable updateMsgValue;
+
private NodeWritable curNode;
private PositionWritable curID;
private PositionWritable nextID;
@@ -54,20 +96,26 @@
private boolean curHead;
private boolean nextHead;
private boolean prevHead;
- private boolean willMerge;
+ private MergeDir mergeDir;
+ private byte inFlag;
private byte headFlag;
private byte tailFlag;
- private byte outFlag;
+ private byte mergeMsgFlag;
+ private byte nextDir;
+ private byte prevDir;
public void configure(JobConf conf) {
-
+
randSeed = conf.getLong("randomSeed", 0);
randGenerator = new Random(randSeed);
probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
- outputKey = new PositionWritable();
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
+
+ mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+ updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
curNode = new NodeWritable(KMER_SIZE);
curID = new PositionWritable();
nextID = new PositionWritable();
@@ -85,11 +133,13 @@
*/
protected boolean setNextInfo(NodeWritable node) {
if (node.getFFList().getCountOfPosition() > 0) {
+ nextDir = MessageFlag.DIR_FF;
nextID.set(node.getFFList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
return true;
}
if (node.getFRList().getCountOfPosition() > 0) {
+ nextDir = MessageFlag.DIR_FR;
nextID.set(node.getFRList().getPosition(0));
nextHead = isNodeRandomHead(nextID);
return true;
@@ -102,11 +152,13 @@
*/
protected boolean setPrevInfo(NodeWritable node) {
if (node.getRRList().getCountOfPosition() > 0) {
+ prevDir = MessageFlag.DIR_RR;
prevID.set(node.getRRList().getPosition(0));
prevHead = isNodeRandomHead(prevID);
return true;
}
if (node.getRFList().getCountOfPosition() > 0) {
+ prevDir = MessageFlag.DIR_RF;
prevID.set(node.getRFList().getPosition(0));
prevHead = isNodeRandomHead(prevID);
return true;
@@ -115,51 +167,34 @@
}
@Override
- public void map(PositionWritable key, MessageWritableNodeWithFlag value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
- throws IOException {
- // Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
- tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
- outFlag = (byte) (headFlag | tailFlag);
-
- // only PATH vertices are present. Find the ID's for my neighbors
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ inFlag = value.getFlag();
curNode.set(value.getNode());
curID.set(curNode.getNodeID());
-
+
+ headFlag = (byte) (MessageFlag.IS_HEAD & inFlag);
+ tailFlag = (byte) (MessageFlag.IS_TAIL & inFlag);
+ mergeMsgFlag = (byte) (headFlag | tailFlag);
+
curHead = isNodeRandomHead(curID);
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
// We prevent merging towards non-path nodes
hasNext = setNextInfo(curNode) && tailFlag == 0;
hasPrev = setPrevInfo(curNode) && headFlag == 0;
- willMerge = false;
-
- reporter.setStatus("CHECK ME OUT");
- System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
+ mergeDir = MergeDir.NO_MERGE; // no merge to happen
- // TODO: need to update edges in neighboring nodes
-
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
- // true HEAD met true TAIL. this path is complete
- outFlag |= MessageFlag.FROM_SELF;
- outputValue.set(outFlag, curNode);
- output.collect(curID, outputValue);
- return;
- }
+ // decide where we're going to merge to
if (hasNext || hasPrev) {
if (curHead) {
if (hasNext && !nextHead) {
- // compress this head to the forward tail
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(nextID, outputValue);
- willMerge = true;
+ // merge forward
+ mergeMsgFlag |= nextDir;
+ mergeDir = MergeDir.FORWARD;
} else if (hasPrev && !prevHead) {
- // compress this head to the reverse tail
- outFlag |= MessageFlag.FROM_SUCCESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(prevID, outputValue);
- willMerge = true;
+ // merge backwards
+ mergeMsgFlag |= prevDir;
+ mergeDir = MergeDir.BACKWARD;
}
} else {
// I'm a tail
@@ -167,58 +202,242 @@
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(nextID, outputValue);
- willMerge = true;
+ mergeMsgFlag |= nextDir;
+ mergeDir = MergeDir.FORWARD;
}
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(nextID, outputValue);
- willMerge = true;
+ mergeMsgFlag |= nextDir;
+ mergeDir = MergeDir.FORWARD;
}
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
- outFlag |= MessageFlag.FROM_SUCCESSOR;
- outputValue.set(outFlag, curNode);
- output.collect(prevID, outputValue);
- willMerge = true;
+ mergeMsgFlag |= prevDir;
+ mergeDir = MergeDir.BACKWARD;
}
}
}
}
- // if we didn't send ourselves to some other node, remap ourselves for the next round
- if (!willMerge) {
- outFlag |= MessageFlag.FROM_SELF;
- outputValue.set(outFlag, curNode);
- output.collect(curID, outputValue);
+ if (mergeDir == MergeDir.NO_MERGE) {
+ mergeMsgFlag |= MessageFlag.MSG_SELF;
+ mergeMsgValue.set(mergeMsgFlag, curNode);
+ output.collect(curID, mergeMsgValue);
+ } else {
+ // this node will do a merge next round
+ mergeMsgFlag |= MessageFlag.MSG_UPDATE_MERGE;
+ mergeMsgValue.set(mergeMsgFlag, curNode);
+ output.collect(curID, mergeMsgValue);
+
+ sendUpdateToNeighbors(curNode, (byte) (mergeMsgFlag & MessageFlag.DIR_MASK), output);
}
- else {
- // TODO send update to this node's neighbors
- //mos.getCollector(UPDATES_OUTPUT, reporter).collect(key, outputValue);
+ }
+
+ /*
+ * when performing a merge, an update message needs to be sent to my neighbors
+ */
+ private void sendUpdateToNeighbors(NodeWritable node, byte mergeDir,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+ PositionWritable mergeSource = node.getNodeID();
+ PositionWritable mergeTarget = node.getListFromDir(mergeDir).getPosition(0);
+
+ // I need to notify in the opposite direction as I'm merging
+ Iterator<PositionWritable> posIterator1;
+ byte dir1;
+ Iterator<PositionWritable> posIterator2;
+ byte dir2;
+ switch (mergeDir) {
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ // merging forward; tell my previous neighbors
+ posIterator1 = node.getRRList().iterator();
+ dir1 = MessageFlag.DIR_RR;
+ posIterator2 = node.getRFList().iterator();
+ dir2 = MessageFlag.DIR_RF;
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ posIterator1 = node.getFFList().iterator();
+ dir1 = MessageFlag.DIR_FF;
+ posIterator2 = node.getFRList().iterator();
+ dir2 = MessageFlag.DIR_FR;
+ break;
+ default:
+ throw new IOException("Unrecognized direction in sendUpdateToNeighbors: " + mergeDir);
+ }
+ while (posIterator1.hasNext()) {
+ updateMsgValue.setAsUpdateMessage(mergeDir, dir1, mergeSource, mergeTarget);
+ collector.collect(posIterator1.next(), updateMsgValue);
+ }
+ while (posIterator2.hasNext()) {
+ updateMsgValue.setAsUpdateMessage(mergeDir, dir2, mergeSource, mergeTarget);
+ collector.collect(posIterator2.next(), outputValue);
}
}
}
/*
- * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
+ * Reducer class: processes the update messages from updateMapper
*/
- private static class MergePathsH4Reducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
- private MultipleOutputs mos;
- public static final String COMPLETE_OUTPUT = "complete";
- public static final String UPDATES_OUTPUT = "update";
-
+ private static class H4UpdatesReducer extends MapReduceBase implements
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
+ private NodeWritable curNode;
+ private PositionWritable outPosn;
+ private ArrayList<NodeWithFlagWritable> updateMsgs;
+ private boolean sawCurNode;
+ private byte outFlag;
+ private byte inFlag;
+
+ public void configure(JobConf conf) {
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ inputValue = new NodeWithFlagWritable(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
+ curNode = new NodeWritable(KMER_SIZE);
+ outPosn = new PositionWritable();
+ updateMsgs = new ArrayList<NodeWithFlagWritable>();
+ }
+
+ /*
+ * Process updates from mapper
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ sawCurNode = false;
+ updateMsgs.clear();
+
+ byte inDir;
+ while (values.hasNext()) {
+ inputValue.set(values.next());
+ inFlag = inputValue.getFlag();
+ inDir = (byte) (inFlag & MessageFlag.MSG_MASK);
+
+ switch (inDir) {
+ case MessageFlag.MSG_UPDATE_MERGE:
+ case MessageFlag.MSG_SELF:
+ if (sawCurNode)
+ throw new IOException("Saw more than one MSG_SELF! previously seen self: " + curNode
+ + " current self: " + inputValue.getNode());
+ curNode.set(inputValue.getNode());
+ outFlag = inFlag;
+ sawCurNode = true;
+ if (inDir == MessageFlag.MSG_SELF) {
+ outPosn.set(curNode.getNodeID());
+ } else { // MSG_UPDATE_MERGE
+ // merge messages are sent to their merge recipient
+ outPosn.set(curNode.getListFromDir(inDir).getPosition(0));
+ }
+ break;
+ case MessageFlag.MSG_UPDATE_EDGE:
+ updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+ break;
+ default:
+ throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
+ }
+ }
+
+ // process all the update messages for this node
+ // I have no idea how to make this more efficient...
+ for (NodeWithFlagWritable updateMsg : updateMsgs) {
+ NodeWithFlagWritable.processUpdates(curNode, updateMsg, KMER_SIZE);
+ }
+ outputValue.set(outFlag, curNode);
+ output.collect(outPosn, outputValue);
+ }
+ }
+
+
+ /*
+ * Mapper class: sends the update messages to their (already decided) destination
+ */
+ public static class H4MergeMapper extends MapReduceBase implements
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+ private static long randSeed;
+ private Random randGenerator;
+ private float probBeingRandomHead;
+
+ private int KMER_SIZE;
+ private NodeWithFlagWritable outputValue;
+ private NodeWithFlagWritable mergeMsgValue;
+ private NodeWithFlagWritable updateMsgValue;
+
+ private NodeWritable curNode;
+ private PositionWritable curID;
+ private PositionWritable nextID;
+ private PositionWritable prevID;
+ private boolean hasNext;
+ private boolean hasPrev;
+ private boolean curHead;
+ private boolean nextHead;
+ private boolean prevHead;
+ private MergeDir mergeDir;
+ private byte inFlag;
+ private byte headFlag;
+ private byte tailFlag;
+ private byte mergeMsgFlag;
+ private byte nextDir;
+ private byte prevDir;
+
+ public void configure(JobConf conf) {
+
+ randSeed = conf.getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
+
+ mergeMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+ updateMsgValue = new NodeWithFlagWritable(KMER_SIZE);
+
+ curNode = new NodeWritable(KMER_SIZE);
+ curID = new PositionWritable();
+ nextID = new PositionWritable();
+ prevID = new PositionWritable();
+ }
+
+ @Override
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ inFlag = value.getFlag();
+ curNode.set(value.getNode());
+ curID.set(curNode.getNodeID());
+
+ }
+
+ }
+
+
+
+
+
+ /*
+ * Reducer class: processes the update messages from updateMapper
+ */
+ private static class H4MergeReducer2 extends MapReduceBase implements
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
+ private MultipleOutputs mos;
+ private static final String TO_MERGE_OUTPUT = "toMerge";
+ private static final String COMPLETE_OUTPUT = "complete";
+ private static final String UPDATES_OUTPUT = "update";
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> updatesCollector;
+
+ private int KMER_SIZE;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private NodeWritable prevNode;
private NodeWritable nextNode;
@@ -231,8 +450,8 @@
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ inputValue = new NodeWithFlagWritable(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
prevNode = new NodeWritable(KMER_SIZE);
nextNode = new NodeWritable(KMER_SIZE);
@@ -240,19 +459,22 @@
@SuppressWarnings("unchecked")
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
- throws IOException {
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
+ toMergeCollector = mos.getCollector(TO_MERGE_OUTPUT, reporter);
+ completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+ updatesCollector = mos.getCollector(UPDATES_OUTPUT, reporter);
inputValue.set(values.next());
if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
- if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
+ if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0
+ && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
// complete path (H & T meet in this node)
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
- } else {
+ completeCollector.collect(key, inputValue);
+ } else {
// FROM_SELF => no merging this round. remap self
- output.collect(key, inputValue);
+ toMergeCollector.collect(key, inputValue);
}
} else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
// FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton? error here!
@@ -274,7 +496,7 @@
} else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
nextNode.set(inputValue.getNode());
sawNextNode = true;
- } else if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ } else if ((inputValue.getFlag() & MessageFlag.MSG_SELF) > 0) {
curNode.set(inputValue.getNode());
sawCurNode = true;
} else {
@@ -304,54 +526,87 @@
curNode.mergeForwardPre(prevNode, KMER_SIZE);
reporter.incrCounter("genomix", "num_merged", 1);
}
-
+
outputValue.set(outFlag, curNode);
if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
// True heads meeting tails => merge is complete for this node
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
+ completeCollector.collect(key, outputValue);
} else {
- output.collect(key, outputValue);
+ toMergeCollector.collect(key, outputValue);
}
}
}
+
+ public void close() throws IOException {
+ mos.close();
+ }
}
/*
* Run one iteration of the mergePaths algorithm
*/
- public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String updatesOutput,
+ JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(MergePathsH4.class);
conf.setJobName("MergePathsH4 " + inputPath);
- FileInputFormat.addInputPath(conf, new Path(inputPath));
- FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ FileInputFormat.addInputPaths(conf, inputPath);
+ Path outputPath = new Path(inputPath + ".h4merge.tmp");
+ FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputFormat(NullOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
- conf.setMapperClass(MergePathsH4Mapper.class);
- conf.setReducerClass(MergePathsH4Reducer.class);
+ conf.setMapperClass(H4UpdatesMapper.class);
+ conf.setReducerClass(H4UpdatesReducer.class);
- FileSystem.get(conf).delete(new Path(outputPath), true);
+ MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.TO_MERGE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, NodeWithFlagWritable.class);
+ MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, NodeWithFlagWritable.class);
+ MultipleOutputs.addNamedOutput(conf, H4UpdatesReducer.UPDATES_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, NodeWithFlagWritable.class);
- return JobClient.runJob(conf);
+ FileSystem dfs = FileSystem.get(conf);
+ // clean output dirs
+ dfs.delete(outputPath, true);
+ dfs.delete(new Path(toMergeOutput), true);
+ dfs.delete(new Path(completeOutput), true);
+ dfs.delete(new Path(updatesOutput), true);
+
+ RunningJob job = JobClient.runJob(conf);
+
+ // move the tmp outputs to the arg-spec'ed dirs. If there is no such dir, create an empty one to simplify downstream processing
+ if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.TO_MERGE_OUTPUT), new Path(
+ toMergeOutput))) {
+ dfs.mkdirs(new Path(toMergeOutput));
+ }
+ if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.COMPLETE_OUTPUT), new Path(
+ completeOutput))) {
+ dfs.mkdirs(new Path(completeOutput));
+ }
+ if (!dfs.rename(new Path(outputPath + File.separator + H4UpdatesReducer.UPDATES_OUTPUT),
+ new Path(updatesOutput))) {
+ dfs.mkdirs(new Path(updatesOutput));
+ }
+
+ return job;
}
@Override
- public int run(String[] arg0) throws Exception {
- // TODO Auto-generated method stub
- return 0;
+ public int run(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
+ return res;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
- System.out.println("Ran the job fine!");
System.exit(res);
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index 155b999..72be4b5 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -16,16 +16,34 @@
import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.tools.ant.util.IdentityMapper;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+
@SuppressWarnings("deprecation")
public class MergePathsH4Driver {
+ private static final String TO_MERGE = "toMerge";
+ private static final String COMPLETE = "complete";
+ private static final String UPDATES = "updates";
+ private String mergeOutput;
+ private String completeOutput;
+ private String updatesOutput;
+
+ private void setOutputPaths(String basePath, int mergeIteration) {
+ basePath = basePath.replaceAll("/$", ""); // strip trailing slash
+ mergeOutput = basePath + "_" + TO_MERGE + "_i" + mergeIteration;
+ completeOutput = basePath + "_" + COMPLETE + "_i" + mergeIteration;
+ updatesOutput = basePath + "_" + UPDATES + "_i" + mergeIteration;
+ }
+
private static class Options {
@Option(name = "-inputpath", usage = "the input path", required = true)
public String inputPath;
@@ -44,13 +62,19 @@
@Option(name = "-merge-rounds", usage = "the maximum number of rounds to merge", required = false)
public int mergeRound;
-
+
@Option(name = "-hadoop-conf", usage = "an (optional) hadoop configuration xml", required = false)
public String hadoopConf;
}
- public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+ /*
+ * Main driver for path merging. Given a graph, this driver runs
+ * PathNodeInitial to ID heads and tails, then does up to @mergeRound
+ * iterations of path merging. Updates during the merge are batch-processed
+ * at the end in a final update job.
+ */
+ public void run(String inputGraphPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
String defaultConfPath, JobConf defaultConf) throws IOException {
JobConf baseConf = defaultConf == null ? new JobConf() : defaultConf;
if (defaultConfPath != null) {
@@ -58,40 +82,71 @@
}
baseConf.setNumReduceTasks(numReducers);
baseConf.setInt("sizeKmer", sizeKmer);
-
FileSystem dfs = FileSystem.get(baseConf);
- String prevOutput = inputPath;
- dfs.delete(new Path(outputPath), true); // clear any previous output
- String tmpOutputPath = "NO_JOBS_DONE";
- boolean finalMerge = false;
- for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
- baseConf.setInt("iMerge", iMerge);
- baseConf.setBoolean("finalMerge", finalMerge);
- MergePathsH4 merger = new MergePathsH4();
- tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
- RunningJob job = merger.run(prevOutput, tmpOutputPath, baseConf);
- if (job.getCounters().findCounter("genomix", "num_merged").getValue() == 0) {
- if (!finalMerge) {
- // all of the pseudoheads have found each other. H3 now behaves like H1
- finalMerge = true;
- } else {
- // already in final merge stage and all paths were merged before. We're done!
- break;
- }
+ int iMerge = 0;
+
+ // identify head and tail nodes with pathnode initial
+ PathNodeInitial inith4 = new PathNodeInitial();
+ setOutputPaths(inputGraphPath, iMerge);
+ String prevToMergeOutput = inputGraphPath;
+ System.out.println("initial run. toMerge: " + mergeOutput + ", complete: " + completeOutput);
+ inith4.run(prevToMergeOutput, mergeOutput, completeOutput, baseConf);
+ dfs.copyToLocalFile(new Path(mergeOutput), new Path("initial-toMerge"));
+ dfs.copyToLocalFile(new Path(completeOutput), new Path("initial-complete"));
+
+ // several iterations of merging
+ MergePathsH4 merger = new MergePathsH4();
+ for (iMerge = 1; iMerge <= mergeRound; iMerge++) {
+ prevToMergeOutput = mergeOutput;
+ setOutputPaths(inputGraphPath, iMerge);
+ merger.run(prevToMergeOutput, mergeOutput, completeOutput, updatesOutput, baseConf);
+ dfs.copyToLocalFile(new Path(mergeOutput), new Path("i" + iMerge +"-toMerge"));
+ dfs.copyToLocalFile(new Path(completeOutput), new Path("i" + iMerge +"-complete"));
+ dfs.copyToLocalFile(new Path(updatesOutput), new Path("i" + iMerge +"-updates"));
+
+ if (dfs.listStatus(new Path(mergeOutput)) == null || dfs.listStatus(new Path(mergeOutput)).length == 0) {
+ // no output from previous run-- we are done!
+ break;
}
}
- dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
+
+ // finally, combine all the completed paths and update messages to
+ // create a single merged graph output
+ dfs.delete(new Path(outputGraphPath), true); // clear any previous
+ // output
+ // use all the "complete" and "update" outputs in addition to the final
+ // (possibly empty) toMerge directories
+ // as input to the final update step. This builds a comma-delim'ed
+ // String of said files.
+ final String lastMergeOutput = mergeOutput;
+ PathFilter updateFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path arg0) {
+ String path = arg0.toString();
+ System.out.println("equals last: " + path + " vs " + lastMergeOutput + " = " + path.endsWith(lastMergeOutput));
+ return (path.matches(".*" + COMPLETE + "_i\\d+$") || path.matches(".*" + UPDATES + "_i\\d+$") || path.endsWith(lastMergeOutput));
+ }
+ };
+ StringBuilder sb = new StringBuilder();
+ String delim = "";
+ for (FileStatus file : dfs.globStatus(new Path(inputGraphPath.replaceAll("/$", "") + "*"), updateFilter)) {
+ sb.append(delim).append(file.getPath());
+ delim = ",";
+ }
+ String finalInputs = sb.toString();
+ System.out.println("This is the final sacrifice: " + finalInputs);
+ // TODO run the update iteration
}
- public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+ public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
String defaultConfPath) throws IOException {
- run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
+ run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, defaultConfPath, null);
}
- public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound,
+ public void run(String inputPath, String outputGraphPath, int numReducers, int sizeKmer, int mergeRound,
JobConf defaultConf) throws IOException {
- run(inputPath, outputPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
+ run(inputPath, outputGraphPath, numReducers, sizeKmer, mergeRound, null, defaultConf);
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
index 6518532..198c769 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -23,10 +23,10 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MergeMessageFlag;
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.H4UpdatesMapper;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -37,22 +37,22 @@
* Mapper class: removes any tips by not mapping them at all
*/
private static class RemoveTipsMapper extends MapReduceBase implements
- Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Mapper<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
private int removeTipsMinLength;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
public void configure(JobConf conf) {
removeTipsMinLength = conf.getInt("removeTipsMinLength", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
}
@Override
- public void map(PositionWritable key, MessageWritableNodeWithFlag value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void map(PositionWritable key, NodeWithFlagWritable value,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
curNode.set(value.getNode());
if ((curNode.inDegree() == 0 || curNode.outDegree() == 0)
@@ -60,7 +60,7 @@
// kill this node by NOT mapping it. Update my neighbors with a suicide note
//TODO: update neighbors by removing me from its list
} else {
- outputValue.set(MessageFlag.FROM_SELF, curNode);
+ outputValue.set(MergeMessageFlag.MSG_SELF, curNode);
output.collect(key, value);
}
}
@@ -70,11 +70,11 @@
* Reducer class: keeps mapped nodes
*/
private static class MergePathsH4Reducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private NodeWritable prevNode;
private NodeWritable nextNode;
@@ -86,20 +86,20 @@
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
curNode = new NodeWritable(KMER_SIZE);
prevNode = new NodeWritable(KMER_SIZE);
nextNode = new NodeWritable(KMER_SIZE);
}
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter)
throws IOException {
inputValue.set(values.next());
if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+ if ((inputValue.getFlag() & MergeMessageFlag.MSG_SELF) > 0) {
// FROM_SELF => keep self
output.collect(key, inputValue);
} else {
@@ -126,11 +126,11 @@
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
- conf.setMapperClass(MergePathsH4Mapper.class);
+ conf.setMapperClass(H4UpdatesMapper.class);
conf.setReducerClass(MergePathsH4Reducer.class);
FileSystem.get(conf).delete(new Path(outputPath), true);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
deleted file mode 100644
index f05797e..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package edu.uci.ics.genomix.hadoop.pmcommon;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-/*
- * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
- * This class is used as the value in several MapReduce algorithms.
- */
-public class MessageWritableNodeWithFlag extends BinaryComparable implements WritableComparable<BinaryComparable> {
- private byte flag;
- private NodeWritable node;
-
- public MessageWritableNodeWithFlag() {
- this(0);
- }
-
- public MessageWritableNodeWithFlag(int k) {
- this.flag = 0;
- this.node = new NodeWritable(k);
- }
-
- public MessageWritableNodeWithFlag(byte flag, int kmerSize) {
- this.flag = flag;
- this.node = new NodeWritable(kmerSize);
- }
-
- public MessageWritableNodeWithFlag(byte flag, NodeWritable node) {
- this(node.getKmer().getKmerLength());
- set(flag, node);
- }
-
- public void set(MessageWritableNodeWithFlag right) {
- set(right.getFlag(), right.getNode());
- }
-
- public void set(byte flag, NodeWritable node) {
- this.node.set(node);
- this.flag = flag;
- }
-
- @Override
- public void readFields(DataInput arg0) throws IOException {
- node.readFields(arg0);
- flag = arg0.readByte();
- }
-
- @Override
- public void write(DataOutput arg0) throws IOException {
- node.write(arg0);
- arg0.writeByte(flag);
- }
-
- public NodeWritable getNode() {
- if (node.getCount() != 0) {
- return node;
- }
- return null;
- }
-
- public byte getFlag() {
- return this.flag;
- }
-
- public String toString() {
- return node.toString() + '\t' + String.valueOf(flag);
- }
-
- @Override
- public byte[] getBytes() {
- if (node.getCount() != 0) {
- return node.getKmer().getBytes();
- } else
- return null;
- }
-
- @Override
- public int getLength() {
- return node.getCount();
- }
-
- @Override
- public int hashCode() {
-// return super.hashCode() + flag + node.hashCode();
- return flag + node.hashCode();
- }
-
- @Override
- public boolean equals(Object rightObj) {
- if (rightObj instanceof MessageWritableNodeWithFlag) {
- MessageWritableNodeWithFlag rightMessage = (MessageWritableNodeWithFlag) rightObj;
- return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
- }
- return false;
- }
-}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
new file mode 100644
index 0000000..a7e8157
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -0,0 +1,261 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import javax.management.RuntimeErrorException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+/*
+ * Simple "Message" class, allowing a NodeWritable to be sent, along with a message flag.
+ * This class is used as the value in several MapReduce algorithms.
+ */
+public class NodeWithFlagWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
+ private byte flag;
+ private NodeWritable node;
+
+ public static class MessageFlag extends DirectionFlag {
+ public static final byte EMPTY_MESSAGE = 0;
+ // message types
+ public static final byte MSG_SELF = 0b01 << 2;
+ public static final byte MSG_UPDATE_MERGE = 0b10 << 2;
+ public static final byte MSG_UPDATE_EDGE = 0b11 << 2;
+ public static final byte MSG_MASK = 0b11 << 2;
+ // additional info
+ public static final byte IS_HEAD = 0b1 << 4;
+ public static final byte IS_TAIL = 0b1 << 5;
+ // extra bit used differently in each operation
+ public static final byte EXTRA_FLAG = 1 << 6;
+ }
+
+ public void setAsUpdateMessage(byte mergeDir, byte neighborDir, PositionWritable nodeToDelete, PositionWritable nodeToAdd) {
+ byte neighborToMeDir = mirrorDirection(neighborDir);
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, mergeDir);
+
+ // clear previous kmer and edge data
+ node.reset(0);
+
+ // indicate the node to delete
+ setFlag((byte) (MessageFlag.MSG_UPDATE_EDGE | neighborToMeDir));
+ node.getNodeID().set(nodeToDelete);
+
+ // add the new node to the appropriate list
+ node.getListFromDir(neighborToMergeDir).append(nodeToAdd);
+ }
+
+
+
+ /*
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /*
+ * When A->B edge type is @neighborDir and B will merge towards C along a @mergeDir edge,
+ * returns the new edge type for A->C
+ */
+ public byte flipDirection(byte neighborDir, byte mergeDir) {
+ switch (mergeDir) {
+
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_RR:
+ // no change since the merging node didn't flip
+ return neighborDir;
+
+ case MessageFlag.DIR_FR:
+ case MessageFlag.DIR_RF:
+ // merging node is flipping; my edge type must also flip
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+
+ default:
+ throw new RuntimeException("Unrecognized direction for mergeDir: " + mergeDir);
+ }
+ }
+
+ /*
+ * Process any changes to @node contained in @updateMsg. This includes merges and edge updates
+ */
+ public static void processUpdates(NodeWritable node, NodeWithFlagWritable updateMsg, int kmerSize)
+ throws IOException {
+ byte updateFlag = updateMsg.getFlag();
+ NodeWritable updateNode = updateMsg.getNode();
+ if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
+ // this message wants to update the edges of node.
+ // remove position and merge its position lists with node
+ if (!updateNode.equals(NodeWritable.EMPTY_NODE)) {
+ // need to remove updateNode from the specified PositionList
+ switch (updateFlag & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ node.getFFList().remove(updateNode.getNodeID());
+ break;
+ case MessageFlag.DIR_FR:
+ node.getFRList().remove(updateNode.getNodeID());
+ break;
+ case MessageFlag.DIR_RF:
+ node.getRFList().remove(updateNode.getNodeID());
+ break;
+ case MessageFlag.DIR_RR:
+ node.getRRList().remove(updateNode.getNodeID());
+ break;
+ default:
+ throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+ }
+ }
+ // now merge positionlists from update and node
+ node.getFFList().appendList(updateNode.getFFList());
+ node.getFRList().appendList(updateNode.getFRList());
+ node.getRFList().appendList(updateNode.getRFList());
+ node.getRRList().appendList(updateNode.getRRList());
+ } else if ((updateFlag & MessageFlag.MSG_UPDATE_MERGE) == MessageFlag.MSG_UPDATE_MERGE) {
+ // this message wants to merge node with updateNode.
+ // the direction flag indicates how the merge should take place.
+ // TODO send update or remove edge that I merge with
+ switch (updateFlag & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+ node.getFFList().set(updateNode.getFFList());
+ // TODO not just FF list here-- FR as well
+ break;
+ case MessageFlag.DIR_FR:
+ // FIXME not sure if this should be reverse-complement or just reverse...
+ node.getKmer().mergeWithFFKmer(kmerSize, updateNode.getKmer());
+ node.getFRList().set(updateNode.getFRList());
+ break;
+ case MessageFlag.DIR_RF:
+
+ break;
+ case MessageFlag.DIR_RR:
+ node.getKmer().mergeWithRRKmer(kmerSize, updateNode.getKmer());
+ node.getRRList().set(updateNode.getRRList());
+ break;
+ default:
+ throw new IOException("Unrecognized direction in updateFlag: " + updateFlag);
+ }
+ }
+ }
+
+ public NodeWithFlagWritable() {
+ this(0);
+ }
+
+ public NodeWithFlagWritable(int k) {
+ this.flag = 0;
+ this.node = new NodeWritable(k);
+ }
+
+ public NodeWithFlagWritable(byte flag, int kmerSize) {
+ this.flag = flag;
+ this.node = new NodeWritable(kmerSize);
+ }
+
+ public NodeWithFlagWritable(byte flag, NodeWritable node) {
+ this(node.getKmer().getKmerLength());
+ set(flag, node);
+ }
+
+ public NodeWithFlagWritable(NodeWithFlagWritable other) {
+ this(other.flag, other.node);
+ }
+
+ public void set(NodeWithFlagWritable right) {
+ set(right.getFlag(), right.getNode());
+ }
+
+ public void set(byte flag, NodeWritable node) {
+ this.node.set(node);
+ this.flag = flag;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ node.readFields(arg0);
+ flag = arg0.readByte();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ node.write(arg0);
+ arg0.writeByte(flag);
+ }
+
+ public NodeWritable getNode() {
+ if (node.getCount() != 0) {
+ return node;
+ }
+ return null;
+ }
+
+ public byte getFlag() {
+ return this.flag;
+ }
+
+ public void setFlag(byte flag) {
+ this.flag = flag;
+ }
+
+ public String toString() {
+ return node.toString() + '\t' + String.valueOf(flag);
+ }
+
+ @Override
+ public byte[] getBytes() {
+ if (node.getCount() != 0) {
+ return node.getKmer().getBytes();
+ } else
+ return null;
+ }
+
+ @Override
+ public int getLength() {
+ return node.getCount();
+ }
+
+ @Override
+ public int hashCode() {
+ // return super.hashCode() + flag + node.hashCode();
+ return flag + node.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object rightObj) {
+ if (rightObj instanceof NodeWithFlagWritable) {
+ NodeWithFlagWritable rightMessage = (NodeWithFlagWritable) rightObj;
+ return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index 497e926..3c46dc7 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.genomix.hadoop.pmcommon;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@@ -33,228 +34,270 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
/*
* A map-reduce job to find all nodes that are part of a simple path and the mark the nodes that
- * form their heads and tails.
+ * form their heads and tails, also identifies parts of the graph that will participate in a path merge.
+ *
+ * This MR job uses MultipleOutputs rather than remapping the entire graph each iteration:
+ * 1. simple path nodes (indegree = outdegree = 1) (TO_MERGE_OUTPUT collector)
+ * 2. non-path, "complete" nodes, which will not be affected by the path merging (COMPLETE_OUTPUT collector)
+ * 3. non-path, "possibly updated" nodes, whose edges need to be updated after the merge (TO_UPDATE_OUTPUT collector)
*/
@SuppressWarnings("deprecation")
public class PathNodeInitial extends Configured implements Tool {
+ public static final String COMPLETE_OUTPUT = "complete";
+ public static final String TO_MERGE_OUTPUT = "toMerge";
+ public static final String TO_UPDATE_OUTPUT = "toUpdate";
+
+ private static byte NEAR_PATH = MessageFlag.EXTRA_FLAG; // special-case extra flag for us
+
+ public static void sendOutputToNextNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+ Iterator<PositionWritable> posIterator = node.getFFList().iterator(); // FFList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ posIterator = node.getFRList().iterator(); // FRList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ }
+
+ public static void sendOutputToPreviousNeighbors(NodeWritable node, NodeWithFlagWritable outputValue,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> collector) throws IOException {
+ Iterator<PositionWritable> posIterator = node.getRRList().iterator(); // RRList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ posIterator = node.getRFList().iterator(); // RFList
+ while (posIterator.hasNext()) {
+ collector.collect(posIterator.next(), outputValue);
+ }
+ }
+
public static class PathNodeInitialMapper extends MapReduceBase implements
- Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
+ Mapper<NodeWritable, NullWritable, PositionWritable, NodeWithFlagWritable> {
private int KMER_SIZE;
private PositionWritable outputKey;
- private MessageWritableNodeWithFlag outputValue;
+ private NodeWithFlagWritable outputValue;
private int inDegree;
private int outDegree;
- private NodeWritable emptyNode;
- private Iterator<PositionWritable> posIterator;
+ private boolean pathNode;
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
outputKey = new PositionWritable();
- emptyNode = new NodeWritable();
}
+ /*
+ * Identify the heads and tails of simple path nodes and their neighbors
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
@Override
public void map(NodeWritable key, NullWritable value,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
- throws IOException {
+ OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
inDegree = key.inDegree();
outDegree = key.outDegree();
if (inDegree == 1 && outDegree == 1) {
- // simple path nodes map themselves
- outputValue.set(MessageFlag.FROM_SELF, key);
- output.collect(key.getNodeID(), outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
+ pathNode = true;
} else if (inDegree == 0 && outDegree == 1) {
+ pathNode = true;
// start of a tip. needs to merge & be marked as head
- outputValue.set(MessageFlag.FROM_SELF, key);
- output.collect(key.getNodeID(), outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
-
- outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
+ outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
output.collect(key.getNodeID(), outputValue);
} else if (inDegree == 1 && outDegree == 0) {
+ pathNode = true;
// end of a tip. needs to merge & be marked as tail
- outputValue.set(MessageFlag.FROM_SELF, key);
- output.collect(key.getNodeID(), outputValue);
- reporter.incrCounter("genomix", "path_nodes", 1);
-
- outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
+ outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
output.collect(key.getNodeID(), outputValue);
} else {
+ pathNode = false;
if (outDegree > 0) {
// Not a path myself, but my successor might be one. Map forward successor to find heads
- outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
- posIterator = key.getFFList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
- posIterator = key.getFRList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
+ outputValue.set(MessageFlag.IS_HEAD, NodeWritable.EMPTY_NODE);
+ sendOutputToNextNeighbors(key, outputValue, output);
}
if (inDegree > 0) {
// Not a path myself, but my predecessor might be one. map predecessor to find tails
- outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
- posIterator = key.getRRList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
- posIterator = key.getRFList().iterator();
- while (posIterator.hasNext()) {
- outputKey.set(posIterator.next());
- output.collect(outputKey, outputValue);
- }
+ outputValue.set(MessageFlag.IS_TAIL, NodeWritable.EMPTY_NODE);
+ sendOutputToPreviousNeighbors(key, outputValue, output);
}
- // push this non-path node to the "complete" output
- outputValue.set((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), key);
+ // this non-path node won't participate in the merge. Mark as "complete" (H + T)
+ outputValue.set((byte) (MessageFlag.MSG_SELF | MessageFlag.IS_HEAD | MessageFlag.IS_TAIL), key);
output.collect(key.getNodeID(), outputValue);
}
+
+ if (pathNode) {
+ // simple path nodes map themselves
+ outputValue.set(MessageFlag.MSG_SELF, key);
+ output.collect(key.getNodeID(), outputValue);
+ reporter.incrCounter("genomix", "path_nodes", 1);
+
+ // also mark neighbors of paths (they are candidates for updates)
+ outputValue.set(NEAR_PATH, NodeWritable.EMPTY_NODE);
+ sendOutputToNextNeighbors(key, outputValue, output);
+ sendOutputToPreviousNeighbors(key, outputValue, output);
+ }
}
}
public static class PathNodeInitialReducer extends MapReduceBase implements
- Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ Reducer<PositionWritable, NodeWithFlagWritable, PositionWritable, NodeWithFlagWritable> {
private MultipleOutputs mos;
- private static final String COMPLETE_OUTPUT = "complete";
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> completeCollector;
+ private OutputCollector<PositionWritable, NodeWithFlagWritable> toUpdateCollector;
private int KMER_SIZE;
- private MessageWritableNodeWithFlag inputValue;
- private MessageWritableNodeWithFlag outputValue;
+
+ private NodeWithFlagWritable inputValue;
+ private NodeWithFlagWritable outputValue;
private NodeWritable nodeToKeep;
- private int count;
- private byte flag;
- private boolean isComplete;
+ private byte outputFlag;
+ private byte inputFlag;
+ private boolean sawSelf;
public void configure(JobConf conf) {
mos = new MultipleOutputs(conf);
KMER_SIZE = conf.getInt("sizeKmer", 0);
- inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
- outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ inputValue = new NodeWithFlagWritable(KMER_SIZE);
+ outputValue = new NodeWithFlagWritable(KMER_SIZE);
nodeToKeep = new NodeWritable(KMER_SIZE);
}
+ /*
+ * Segregate nodes into three bins:
+ * 1. mergeable nodes (maybe marked H or T)
+ * 2. non-mergeable nodes that are candidates for updates
+ * 3. non-mergeable nodes that are not path neighbors and won't be updated
+ *
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
@SuppressWarnings("unchecked")
@Override
- public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
- OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
+ OutputCollector<PositionWritable, NodeWithFlagWritable> toMergeCollector, Reporter reporter)
throws IOException {
+ completeCollector = mos.getCollector(COMPLETE_OUTPUT, reporter);
+ toUpdateCollector = mos.getCollector(TO_UPDATE_OUTPUT, reporter);
- inputValue.set(values.next());
- if (!values.hasNext()) {
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
- if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
- // non-path node. Store in "complete" output
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+ outputFlag = MessageFlag.EMPTY_MESSAGE;
+ sawSelf = false;
+ while (values.hasNext()) {
+ inputValue.set(values.next());
+ inputFlag = inputValue.getFlag();
+ outputFlag |= inputFlag;
+
+ if ((inputFlag & MessageFlag.MSG_SELF) > 0) {
+ // SELF -> keep this node
+ if (sawSelf) {
+ throw new IOException("Already saw SELF node in PathNodeInitialReducer! previous self: "
+ + nodeToKeep.toString() + ". current self: " + inputValue.getNode().toString());
+ }
+ sawSelf = true;
+ nodeToKeep.set(inputValue.getNode());
+ }
+ }
+
+ if ((outputFlag & MessageFlag.MSG_SELF) > 0) {
+ if ((outputFlag & MessageFlag.IS_HEAD) > 0 && (outputFlag & MessageFlag.IS_TAIL) > 0) {
+ // non-path or single path nodes
+ if ((outputFlag & NEAR_PATH) > 0) {
+ // non-path, but an update candidate
+ outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+ toUpdateCollector.collect(key, outputValue);
} else {
- // FROM_SELF => need to keep this PATH node
- output.collect(key, inputValue);
+ // non-path or single-node path. Store in "complete" output
+ outputValue.set(MessageFlag.EMPTY_MESSAGE, nodeToKeep);
+ completeCollector.collect(key, outputValue);
+ }
+ } else {
+ // path nodes that are mergeable
+ outputFlag &= (MessageFlag.IS_HEAD | MessageFlag.IS_TAIL); // clear flags except H/T
+ outputValue.set(outputFlag, nodeToKeep);
+ toMergeCollector.collect(key, outputValue);
+
+ reporter.incrCounter("genomix", "path_nodes", 1);
+ if ((outputFlag & MessageFlag.IS_HEAD) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_heads", 1);
+ }
+ if ((outputFlag & MessageFlag.IS_TAIL) > 0) {
+ reporter.incrCounter("genomix", "path_nodes_tails", 1);
}
}
} else {
- // multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node
- count = 0;
- flag = MessageFlag.EMPTY_MESSAGE;
- isComplete = false;
- while (true) { // process values; break when no more
- count++;
- if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
- // SELF -> keep this node
- flag |= MessageFlag.FROM_SELF;
- nodeToKeep.set(inputValue.getNode());
- if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
- isComplete = true;
- }
- } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
- flag |= MessageFlag.IS_TAIL;
- } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
- flag |= MessageFlag.IS_HEAD;
- }
- if (!values.hasNext()) {
- break;
- } else {
- inputValue.set(values.next());
- }
- }
- if (count < 2) {
- throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
- + String.valueOf(count));
- }
- if ((flag & MessageFlag.FROM_SELF) > 0) {
- if ((flag & MessageFlag.IS_COMPLETE) > 0) {
- // non-path node. Store in "complete" output
- mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
- } else {
- // only keep simple path nodes
- outputValue.set(flag, nodeToKeep);
- output.collect(key, outputValue);
-
- reporter.incrCounter("genomix", "path_nodes", 1);
- if ((flag & MessageFlag.IS_HEAD) > 0) {
- reporter.incrCounter("genomix", "path_nodes_heads", 1);
- }
- if ((flag & MessageFlag.IS_TAIL) > 0) {
- reporter.incrCounter("genomix", "path_nodes_tails", 1);
- }
- }
- } else {
- throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + flag);
- }
+ throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + outputFlag);
}
}
+
+ public void close() throws IOException {
+ mos.close();
+ }
}
/*
* Mark the head, tail, and simple path nodes in one map-reduce job.
*/
- public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ public RunningJob run(String inputPath, String toMergeOutput, String completeOutput, String toUpdateOutput,
+ JobConf baseConf) throws IOException {
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(PathNodeInitial.class);
conf.setJobName("PathNodeInitial " + inputPath);
- FileInputFormat.addInputPath(conf, new Path(inputPath));
- FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ FileInputFormat.addInputPaths(conf, inputPath);
+ // Path outputPath = new Path(inputPath.replaceAll("/$", "") + ".initialMerge.tmp");
+ Path outputPath = new Path(toMergeOutput);
+ FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputFormat(NullOutputFormat.class);
conf.setMapOutputKeyClass(PositionWritable.class);
- conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setMapOutputValueClass(NodeWithFlagWritable.class);
conf.setOutputKeyClass(PositionWritable.class);
- conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputValueClass(NodeWithFlagWritable.class);
conf.setMapperClass(PathNodeInitialMapper.class);
conf.setReducerClass(PathNodeInitialReducer.class);
- FileSystem.get(conf).delete(new Path(outputPath), true);
+ MultipleOutputs.addNamedOutput(conf, COMPLETE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, NodeWithFlagWritable.class);
+ MultipleOutputs.addNamedOutput(conf, TO_UPDATE_OUTPUT, MergePathMultiSeqOutputFormat.class,
+ PositionWritable.class, NodeWithFlagWritable.class);
- return JobClient.runJob(conf);
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(outputPath, true); // clean output dir
+ RunningJob job = JobClient.runJob(conf);
+
+ // move the tmp outputs to the arg-spec'ed dirs
+ dfs.rename(new Path(outputPath + File.separator + COMPLETE_OUTPUT), new Path(completeOutput));
+ dfs.rename(new Path(outputPath + File.separator + TO_UPDATE_OUTPUT), new Path(toUpdateOutput));
+ // dfs.rename(outputPath, new Path(toMergeOutput));
+
+ return job;
}
@Override
- public int run(String[] arg0) throws Exception {
- // TODO Auto-generated method stub
- return 0;
+ public int run(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+ return res;
}
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new PathNodeInitial(), args);
+ int res = new PathNodeInitial().run(args);
System.exit(res);
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
index c735a0d..4040cab 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
@@ -53,14 +53,14 @@
boolean onlyTest1stJob, boolean seqOutput, String defaultConfPath) throws IOException {
if (onlyTest1stJob == true) {
- runfirstjob(inputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
+ runfirstjob(inputPath, outputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
} else {
- runfirstjob(inputPath, numReducers, sizeKmer, readLength, true, defaultConfPath);
- runsecondjob(inputPath, outputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
+ runfirstjob(inputPath, inputPath + "-tmp", numReducers, sizeKmer, readLength, true, defaultConfPath);
+ runsecondjob(inputPath + "-tmp", outputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
}
}
- public void runfirstjob(String inputPath, int numReducers, int sizeKmer, int readLength, boolean seqOutput,
+ public void runfirstjob(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength, boolean seqOutput,
String defaultConfPath) throws IOException {
JobConf conf = new JobConf(GraphBuildingDriver.class);
conf.setInt("sizeKmer", sizeKmer);
@@ -85,14 +85,14 @@
conf.setOutputValueClass(PositionListWritable.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
- FileOutputFormat.setOutputPath(conf, new Path(inputPath + "-step1"));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
if (numReducers == 0)
conf.setNumReduceTasks(numReducers + 2);
else
conf.setNumReduceTasks(numReducers);
FileSystem dfs = FileSystem.get(conf);
- dfs.delete(new Path(inputPath + "-step1"), true);
+ dfs.delete(new Path(outputPath), true);
JobClient.runJob(conf);
}
@@ -132,7 +132,7 @@
conf.setOutputValueClass(PositionListAndKmerWritable.class);
}
- FileInputFormat.setInputPaths(conf, new Path(inputPath + "-step1"));
+ FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setNumReduceTasks(numReducers);
FileSystem dfs = FileSystem.get(conf);
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
index cc922de..c04ba46 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -54,11 +54,12 @@
}
PathNodeInitial inith3 = new PathNodeInitial();
- inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
- copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+ inith3.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS + "toMerge", HDFS_MARKPATHS + "complete", conf);
+ copyResultsToLocal(HDFS_MARKPATHS + "toMerge", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
+ copyResultsToLocal(HDFS_MARKPATHS + "complete", ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
MergePathsH3Driver h3 = new MergePathsH3Driver();
- h3.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
+ h3.run(HDFS_MARKPATHS + "toMerge", HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
index 9e799f3..45a2d0a 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/TestPathMergeH4.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -11,27 +12,27 @@
import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3Driver;
import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
+import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
@SuppressWarnings("deprecation")
-public class TestPathMergeH4 extends GenomixMiniClusterTest {
- protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
- protected String HDFS_SEQUENCE = "/00-sequence/";
- protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
- protected String HDFS_MARKPATHS = "/02-pathmark/";
- protected String HDFS_MERGED = "/03-pathmerge/";
+public class TestPathMergeH4 extends HadoopMiniClusterTest {
+ protected final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+ protected final String SEQUENCE = "/00-sequence/";
+ protected final String GRAPHBUILD = "/01-graphbuild/";
+ protected final String MERGED = "/02-pathmerge/";
- protected String GRAPHBUILD_FILE = "graphbuild.result";
- protected String PATHMARKS_FILE = "markpaths.result";
- protected String PATHMERGE_FILE = "h4.mergepath.result";
- protected boolean regenerateGraph = true;
+ protected final String ACTUAL = "src/test/resources/actual/";
+
+ protected final boolean regenerateGraph = true;
{
KMER_LENGTH = 5;
READ_LENGTH = 8;
- HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MARKPATHS, HDFS_MERGED));
+ HDFS_PATHS = new ArrayList<String>(Arrays.asList(SEQUENCE, GRAPHBUILD, MERGED));
conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
}
@@ -39,34 +40,45 @@
@Test
public void TestMergeOneIteration() throws Exception {
cleanUpOutput();
- if (regenerateGraph) {
- copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
- buildGraph();
- copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
- } else {
- copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD_FILE + ".binmerge", HDFS_GRAPHBUILD);
- }
+ prepareGraph();
- PathNodeInitial inith4 = new PathNodeInitial();
- inith4.run(HDFS_GRAPHBUILD, HDFS_MARKPATHS, conf);
- copyResultsToLocal(HDFS_MARKPATHS, ACTUAL_ROOT + PATHMARKS_FILE, false, conf);
-
MergePathsH4Driver h4 = new MergePathsH4Driver();
- h4.run(HDFS_MARKPATHS, HDFS_MERGED, 2, KMER_LENGTH, 1, conf);
- copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, false, conf);
+ h4.run(GRAPHBUILD, MERGED, 2, KMER_LENGTH, 5, conf);
+ copyResultsToLocal(MERGED, ACTUAL_ROOT + MERGED, false, conf);
}
+// @Test
+ public void testPathNode() throws IOException {
+ cleanUpOutput();
+ prepareGraph();
+
+ // identify head and tail nodes with pathnode initial
+ PathNodeInitial inith4 = new PathNodeInitial();
+ inith4.run(GRAPHBUILD, "/toMerge", "/completed", conf);
+ }
+
+
- public void buildGraph() throws Exception {
+ public void buildGraph() throws IOException {
JobConf buildConf = new JobConf(conf); // use a separate conf so we don't interfere with other jobs
- FileInputFormat.setInputPaths(buildConf, HDFS_SEQUENCE);
- FileOutputFormat.setOutputPath(buildConf, new Path(HDFS_GRAPHBUILD));
- buildConf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
- buildConf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- driver.runJob(new GenomixJobConf(buildConf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- String fileFormat = buildConf.get(GenomixJobConf.OUTPUT_FORMAT);
- boolean resultsAreText = GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat);
- copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, resultsAreText, buildConf);
+ FileInputFormat.setInputPaths(buildConf, SEQUENCE);
+ FileOutputFormat.setOutputPath(buildConf, new Path(GRAPHBUILD));
+
+ GraphBuildingDriver tldriver = new GraphBuildingDriver();
+ tldriver.run(SEQUENCE, GRAPHBUILD, 2, KMER_LENGTH, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
+
+ boolean resultsAreText = false;
+ copyResultsToLocal(GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD, resultsAreText, buildConf);
+ }
+
+ private void prepareGraph() throws IOException {
+ if (regenerateGraph) {
+ copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
+ buildGraph();
+ copyLocalToDFS(ACTUAL_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+ } else {
+ copyLocalToDFS(EXPECTED_ROOT + GRAPHBUILD + ".bindir", GRAPHBUILD);
+ }
}
}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
new file mode 100644
index 0000000..9a113f1
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -0,0 +1,199 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+
+/*
+ * A base class providing most of the boilerplate for Hadoop-based tests
+ */
+@SuppressWarnings("deprecation")
+public class HadoopMiniClusterTest {
+ protected int KMER_LENGTH = 5;
+ protected int READ_LENGTH = 8;
+
+ // subclass should modify this to include the HDFS directories that should be cleaned up
+ protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+ protected static String EXPECTED_ROOT = "src/test/resources/expected/";
+ protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+
+ protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+ protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
+
+ protected static MiniDFSCluster dfsCluster;
+ protected static MiniMRCluster mrCluster;
+ protected static FileSystem dfs;
+ protected static JobConf conf = new JobConf();
+ protected static int numberOfNC = 1;
+ protected static int numPartitionPerMachine = 1;
+
+ @BeforeClass
+ public static void setUpMiniCluster() throws Exception {
+ cleanupStores();
+ startHDFS();
+ FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+ FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+ }
+
+ /*
+ * Merge and copy a DFS directory to a local destination, converting to text if necessary.
+ * Also locally store the binary-formatted result if available.
+ */
+ protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf) throws IOException {
+ if (resultsAreText) {
+ // for text files, just concatenate them together
+ FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+ new Path(localDestFile), false, conf, null);
+ } else {
+ // file is binary
+ // save the entire binary output dir
+ FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+ new Path(localDestFile + ".bindir"), false, conf);
+
+ // chomp through output files
+ FileStatus[] files = ArrayUtils.addAll(dfs.globStatus(new Path(hdfsSrcDir + "*")), dfs.globStatus(new Path(hdfsSrcDir + "*/*")));
+ FileStatus validFile = null;
+ for (FileStatus f : files) {
+ if (f.getLen() != 0) {
+ validFile = f;
+ break;
+ }
+ }
+ if (validFile == null) {
+ throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+ }
+
+ // also load the Nodes and write them out as text locally.
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.mkdirs(new Path(localDestFile).getParent());
+ File filePathTo = new File(localDestFile);
+ if (filePathTo.exists() && filePathTo.isDirectory()) {
+ filePathTo = new File(localDestFile + "/data");
+ }
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+ SequenceFile.Reader reader = new SequenceFile.Reader(dfs, validFile.getPath(), conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+ + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ for (FileStatus f : files) {
+ if (f.getLen() == 0) {
+ continue;
+ }
+ reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+ while (reader.next(key, value)) {
+ if (key == null || value == null) {
+ break;
+ }
+ bw.write(key.toString() + "\t" + value.toString());
+ System.out.println(key.toString() + "\t" + value.toString());
+ bw.newLine();
+ writer.append(key, value);
+
+ }
+ reader.close();
+ }
+ writer.close();
+ bw.close();
+ }
+
+ }
+
+ protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+ File dumped = new File(actualPath);
+ if (poslistField != null) {
+ TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+ } else {
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+ }
+ return true;
+ }
+
+ protected static void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ protected static void startHDFS() throws IOException {
+// conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+ // conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+// conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+ System.out.println(dfs.getUri().toString());
+
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF)));
+ conf.writeXml(confOutput);
+ confOutput.close();
+ }
+
+ protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+ Path dest = new Path(hdfsDest);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(new Path(localSrc), dest);
+ }
+
+ /*
+ * Remove the local "actual" folder and any hdfs folders in use by this test
+ */
+ public void cleanUpOutput() throws IOException {
+ // local cleanup
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ if (lfs.exists(new Path(ACTUAL_ROOT))) {
+ lfs.delete(new Path(ACTUAL_ROOT), true);
+ }
+ // dfs cleanup
+ for (String path : HDFS_PATHS) {
+ if (dfs.exists(new Path(path))) {
+ dfs.delete(new Path(path), true);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cleanupHDFS();
+ }
+
+ protected static void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ mrCluster.shutdown();
+ }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
deleted file mode 100644
index b142f87..0000000
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package edu.uci.ics.genomix.hadoop.pmcommon;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
-import org.apache.hadoop.mrunit.MapDriver;
-import org.apache.hadoop.mrunit.ReduceDriver;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
-import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-
-@SuppressWarnings("deprecation")
-public class TestPathNodeInitial {
- PositionWritable posn1 = new PositionWritable(0, (byte) 1);
- PositionWritable posn2 = new PositionWritable(1, (byte) 1);
- PositionWritable posn3 = new PositionWritable(2, (byte) 1);
- PositionWritable posn4 = new PositionWritable(3, (byte) 1);
- PositionWritable posn5 = new PositionWritable(5, (byte) 1);
- String kmerString = "ATGCA";
- KmerBytesWritable kmer = new KmerBytesWritable(kmerString.length(), kmerString);
- JobConf conf = new JobConf();
- MultipleOutputs mos = new MultipleOutputs(conf);
-
- {
- conf.set("sizeKmer", String.valueOf(kmerString.length()));
- }
-
- @Test
- public void testNoNeighbors() throws IOException {
- NodeWritable noNeighborNode = new NodeWritable(posn1, new PositionListWritable(), new PositionListWritable(),
- new PositionListWritable(), new PositionListWritable(), kmer);
- MessageWritableNodeWithFlag output = new MessageWritableNodeWithFlag((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), noNeighborNode);
- // test mapper
- new MapDriver<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag>()
- .withMapper(new PathNodeInitial.PathNodeInitialMapper())
- .withConfiguration(conf)
- .withInput(noNeighborNode, NullWritable.get())
- .withOutput(posn1, output)
- .runTest();
- // test reducer
-// MultipleOutputs.addNamedOutput(conf, "complete", SequenceFileOutputFormat.class, PositionWritable.class, MessageWritableNodeWithFlag.class);
- new ReduceDriver<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag>()
- .withReducer(new PathNodeInitial.PathNodeInitialReducer())
- .withConfiguration(conf)
- .withInput(posn1, Arrays.asList(output))
- .runTest();
- }
-}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
index f6236d2..2517810 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
@@ -42,15 +42,16 @@
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
startHadoop();
- TestGroupbyKmer();
- TestMapKmerToRead();
+// TestGroupbyKmer();
+// TestMapKmerToRead();
+ TestGroupByReadID();
cleanupHadoop();
}
public void TestGroupbyKmer() throws Exception {
GraphBuildingDriver tldriver = new GraphBuildingDriver();
tldriver.run(HDFS_PATH, RESULT_PATH, COUNT_REDUCER, SIZE_KMER, READ_LENGTH, true, false, HADOOP_CONF_PATH);
- dumpGroupByKmerResult();
+ dumpResult();
}
public void TestMapKmerToRead() throws Exception {
@@ -89,11 +90,6 @@
dfsCluster.shutdown();
}
- private void dumpGroupByKmerResult() throws IOException {
- Path src = new Path(HDFS_PATH + "-step1");
- Path dest = new Path(ACTUAL_RESULT_DIR);
- dfs.copyToLocalFile(src, dest);
- }
private void dumpResult() throws IOException {
Path src = new Path(RESULT_PATH);