Merge branch 'wbiesing/genomix/VKmers' into nanzhang/hyracks_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index c38e35d..df93069 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -26,7 +26,8 @@
import edu.uci.ics.genomix.data.KmerUtil;
import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+
/**
* Variable-length kmer which stores its length internally.
@@ -497,6 +498,11 @@
clearLeadBit();
saveHeader(lettersInKmer);
}
+
+ public void mergeWithFFKmer(int kmerSize, KmerBytesWritable kmer) {
+ // TODO make this more efficient
+ mergeWithFFKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+ }
/**
* Merge Kmer with the next connected Kmer, when that Kmer needs to be
@@ -538,6 +544,11 @@
}
saveHeader(lettersInKmer);
}
+
+ public void mergeWithFRKmer(int kmerSize, KmerBytesWritable kmer) {
+ // TODO make this more efficient
+ mergeWithFRKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+ }
/**
* Merge Kmer with the previous connected Kmer, when that kmer needs to be
@@ -550,10 +561,16 @@
* : the previous kmer
*/
public void mergeWithRFKmer(int initialKmerSize, VKmerBytesWritable preKmer) {
+ // TODO make this more efficient
VKmerBytesWritable reversed = new VKmerBytesWritable(preKmer.lettersInKmer);
reversed.setByReadReverse(preKmer.toString().getBytes(), 0);
mergeWithRRKmer(initialKmerSize, reversed);
}
+
+ public void mergeWithRFKmer(int kmerSize, KmerBytesWritable kmer) {
+ // TODO make this more efficient
+ mergeWithRFKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+ }
/**
* Merge Kmer with the previous connected Kmer e.g. AACAACC merge with
@@ -590,6 +607,11 @@
bytes, kmerStartOffset, bytesUsed);
clearLeadBit();
}
+
+ public void mergeWithRRKmer(int kmerSize, KmerBytesWritable kmer) {
+ // TODO make this more efficient
+ mergeWithRRKmer(kmerSize, new VKmerBytesWritable(kmer.toString()));
+ }
public void mergeWithKmerInDir(byte dir, int initialKmerSize, VKmerBytesWritable kmer) {
switch (dir & DirectionFlag.DIR_MASK) {
@@ -609,5 +631,16 @@
throw new RuntimeException("Direction not recognized: " + dir);
}
}
+ public void mergeWithKmerInDir(byte dir, int initialKmerSize, KmerBytesWritable kmer) {
+ // TODO make this more efficient
+ mergeWithKmerInDir(dir, initialKmerSize, new VKmerBytesWritable(kmer.toString()));
+ }
+
+ public KmerBytesWritable asFixedLengthKmer() {
+ if (lettersInKmer != KmerBytesWritable.getKmerLength()) {
+ throw new IllegalArgumentException("VKmer " + this.toString() + " is not of the same length as the fixed length Kmer (" + KmerBytesWritable.getKmerLength() + " )!");
+ }
+ return new KmerBytesWritable(bytes, kmerStartOffset);
+ }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java
new file mode 100644
index 0000000..c3d8a98
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+public class GeneCode {
+ public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
+ /**
+ * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL
+ * }
+ */
+ public static final byte A = 0;
+ public static final byte C = 1;
+ public static final byte G = 2;
+ public static final byte T = 3;
+
+ public static byte getCodeFromSymbol(byte ch) {
+ byte r = 0;
+ switch (ch) {
+ case 'A':
+ case 'a':
+ r = A;
+ break;
+ case 'C':
+ case 'c':
+ r = C;
+ break;
+ case 'G':
+ case 'g':
+ r = G;
+ break;
+ case 'T':
+ case 't':
+ r = T;
+ break;
+ }
+ return r;
+ }
+
+ public static byte getPairedGeneCode(byte genecode){
+ if ( genecode < 0 || genecode > 3){
+ throw new IllegalArgumentException("Invalid genecode");
+ }
+ return (byte) (3- genecode);
+ }
+
+ public static byte getPairedCodeFromSymbol(byte ch){
+ return getPairedGeneCode(getCodeFromSymbol(ch));
+ }
+
+ public static byte getSymbolFromCode(byte code) {
+ if (code > 3 || code < 0 ) {
+ throw new IllegalArgumentException("Invalid genecode");
+ }
+ return GENE_SYMBOL[code];
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java
new file mode 100644
index 0000000..630dbad
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java
@@ -0,0 +1,502 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.KmerUtil;
+
+/**
+ * Variable kmer length byteswritable
+ * It was used to generate the graph in which phase the kmer length doesn't change.
+ * Thus the size of bytes doesn't change either.
+ */
+public class KmerBytesWritable extends BinaryComparable implements Serializable, WritableComparable<BinaryComparable> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final byte[] EMPTY_BYTES = {};
+
+ protected int size;
+ protected byte[] bytes;
+ protected int offset;
+ protected int kmerlength;
+
+ public KmerBytesWritable() {
+ this(0, EMPTY_BYTES, 0);
+ }
+
+ public KmerBytesWritable(int k, byte[] storage, int offset) {
+ setNewReference(k, storage, offset);
+ }
+
+ public KmerBytesWritable(int k, String kmer) {
+ setNewReference(kmer.length(), kmer.getBytes(), 0);
+ }
+
+ /**
+ * Initial Kmer space by kmerlength
+ *
+ * @param k
+ * kmerlength
+ */
+ public KmerBytesWritable(int k) {
+ this.kmerlength = k;
+ this.size = KmerUtil.getByteNumFromK(kmerlength);
+ if (k > 0) {
+ this.bytes = new byte[this.size];
+ } else {
+ this.bytes = EMPTY_BYTES;
+ }
+ this.offset = 0;
+ }
+
+ public KmerBytesWritable(KmerBytesWritable right) {
+ this(right.kmerlength);
+ set(right);
+ }
+
+ /**
+ * Deep copy of the given kmer
+ *
+ * @param newData
+ */
+ public void set(KmerBytesWritable newData) {
+ if (newData == null) {
+ this.set(0, EMPTY_BYTES, 0);
+ } else {
+ this.set(newData.kmerlength, newData.bytes, newData.getOffset());
+ }
+ }
+
+ /**
+ * Deep copy of the given bytes data
+ * It will not change the kmerlength
+ *
+ * @param newData
+ * @param offset
+ */
+ public void set(byte[] newData, int offset) {
+ if (kmerlength > 0) {
+ System.arraycopy(newData, offset, bytes, this.offset, size);
+ }
+ }
+
+ /**
+ * Deep copy of the given data, and also set to new kmerlength
+ *
+ * @param k
+ * : new kmer length
+ * @param newData
+ * : data storage
+ * @param offset
+ * : start offset
+ */
+ public void set(int k, byte[] newData, int offset) {
+ reset(k);
+ if (k > 0) {
+ System.arraycopy(newData, offset, bytes, this.offset, size);
+ }
+ }
+
+ /**
+ * Reset array by kmerlength
+ *
+ * @param k
+ */
+ public void reset(int k) {
+ this.kmerlength = k;
+ setSize(KmerUtil.getByteNumFromK(k));
+ clearLeadBit();
+ }
+
+ /**
+ * Point this datablock to the given bytes array
+ * It works like the pointer to new datablock.
+ * kmerlength will not change
+ *
+ * @param newData
+ * @param offset
+ */
+ public void setNewReference(byte[] newData, int offset) {
+ this.bytes = newData;
+ this.offset = offset;
+ if (newData.length - offset < size) {
+ throw new IllegalArgumentException("Not given enough space");
+ }
+ }
+
+ /**
+ * Point this datablock to the given bytes array
+ * It works like the pointer to new datablock.
+ * It also set the new kmerlength
+ *
+ * @param k
+ * @param newData
+ * @param offset
+ */
+ public void setNewReference(int k, byte[] newData, int offset) {
+ this.kmerlength = k;
+ this.size = KmerUtil.getByteNumFromK(k);
+ setNewReference(newData, offset);
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ this.size = size;
+ }
+
+ protected int getCapacity() {
+ return bytes.length;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap != getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (new_cap < size) {
+ size = new_cap;
+ }
+ if (size != 0) {
+ System.arraycopy(bytes, offset, new_data, 0, size);
+ }
+ bytes = new_data;
+ offset = 0;
+ }
+ }
+
+ /**
+ * Get one genecode (A|G|C|T) from the given kmer index
+ * e.g. Get the 4th gene of the kmer ACGTA will return T
+ *
+ * @param pos
+ * @return
+ */
+ public byte getGeneCodeAtPosition(int pos) {
+ if (pos >= kmerlength) {
+ throw new IllegalArgumentException("gene position out of bound");
+ }
+ int posByte = pos / 4;
+ int shift = (pos % 4) << 1;
+ return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
+ }
+
+ public int getKmerLength() {
+ return this.kmerlength;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ @Override
+ public int getLength() {
+ return size;
+ }
+
+ /**
+ * Read Kmer from read text into bytes array e.g. AATAG will compress as
+ * [0x000G, 0xATAA]
+ *
+ * @param k
+ * @param array
+ * @param start
+ */
+ public void setByRead(byte[] array, int start) {
+ byte l = 0;
+ int bytecount = 0;
+ int bcount = this.size - 1;
+ for (int i = start; i < start + kmerlength && i < array.length; i++) {
+ byte code = GeneCode.getCodeFromSymbol(array[i]);
+ l |= (byte) (code << bytecount);
+ bytecount += 2;
+ if (bytecount == 8) {
+ bytes[offset + bcount--] = l;
+ l = 0;
+ bytecount = 0;
+ }
+ }
+ if (bcount >= 0) {
+ bytes[offset] = l;
+ }
+ }
+
+ public void setByRead(int k, byte[] array, int start) {
+ reset(k);
+ setByRead(array, start);
+ }
+
+ /**
+ * Compress Reversed read into bytes array
+ * e.g. AATAG will paired to CTATT, and then compress as
+ * [0x000T,0xTATC]
+ *
+ * @param input
+ * array
+ * @param start
+ * position
+ */
+ public void setByReadReverse(byte[] array, int start) {
+ byte l = 0;
+ int bytecount = 0;
+ int bcount = size - 1;
+ for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
+ byte code = GeneCode.getPairedCodeFromSymbol(array[i]);
+ l |= (byte) (code << bytecount);
+ bytecount += 2;
+ if (bytecount == 8) {
+ bytes[offset + bcount--] = l;
+ l = 0;
+ bytecount = 0;
+ }
+ }
+ if (bcount >= 0) {
+ bytes[offset] = l;
+ }
+ }
+
+ public void setByReadReverse(int k, byte[] array, int start) {
+ reset(k);
+ setByReadReverse(array, start);
+ }
+
+ /**
+ * Shift Kmer to accept new char input
+ *
+ * @param c
+ * Input new gene character
+ * @return the shift out gene, in gene code format
+ */
+ public byte shiftKmerWithNextChar(byte c) {
+ return shiftKmerWithNextCode(GeneCode.getCodeFromSymbol(c));
+ }
+
+ /**
+ * Shift Kmer to accept new gene code
+ *
+ * @param c
+ * Input new gene code
+ * @return the shift out gene, in gene code format
+ */
+ public byte shiftKmerWithNextCode(byte c) {
+ byte output = (byte) (bytes[offset + size - 1] & 0x03);
+ for (int i = size - 1; i > 0; i--) {
+ byte in = (byte) (bytes[offset + i - 1] & 0x03);
+ bytes[offset + i] = (byte) (((bytes[offset + i] >>> 2) & 0x3f) | (in << 6));
+ }
+ int pos = ((kmerlength - 1) % 4) << 1;
+ byte code = (byte) (c << pos);
+ bytes[offset] = (byte) (((bytes[offset] >>> 2) & 0x3f) | code);
+ clearLeadBit();
+ return output;
+ }
+
+ /**
+ * Shift Kmer to accept new input char
+ *
+ * @param c
+ * Input new gene character
+ * @return the shiftout gene, in gene code format
+ */
+ public byte shiftKmerWithPreChar(byte c) {
+ return shiftKmerWithPreCode(GeneCode.getCodeFromSymbol(c));
+ }
+
+ /**
+ * Shift Kmer to accept new gene code
+ *
+ * @param c
+ * Input new gene code
+ * @return the shiftout gene, in gene code format
+ */
+ public byte shiftKmerWithPreCode(byte c) {
+ int pos = ((kmerlength - 1) % 4) << 1;
+ byte output = (byte) ((bytes[offset] >> pos) & 0x03);
+ for (int i = 0; i < size - 1; i++) {
+ byte in = (byte) ((bytes[offset + i + 1] >> 6) & 0x03);
+ bytes[offset + i] = (byte) ((bytes[offset + i] << 2) | in);
+ }
+ bytes[offset + size - 1] = (byte) ((bytes[offset + size - 1] << 2) | c);
+ clearLeadBit();
+ return output;
+ }
+
+ /**
+ * Merge Kmer with the next connected Kmer
+ * e.g. AAGCTAA merge with AACAACC, if the initial kmerSize = 3
+ * then it will return AAGCTAACAACC
+ *
+ * @param initialKmerSize
+ * : the initial kmerSize
+ * @param kmer
+ * : the next kmer
+ */
+ public void mergeNextKmer(int initialKmerSize, KmerBytesWritable kmer) {
+ int preKmerLength = kmerlength;
+ int preSize = size;
+ this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ for (int i = 1; i <= preSize; i++) {
+ bytes[offset + size - i] = bytes[offset + preSize - i];
+ }
+ for (int k = initialKmerSize - 1; k < kmer.getKmerLength(); k += 4) {
+ byte onebyte = getOneByteFromKmerAtPosition(k, kmer.getBytes(), kmer.getOffset(), kmer.getLength());
+ appendOneByteAtPosition(preKmerLength + k - initialKmerSize + 1, onebyte, bytes, offset, size);
+ }
+ clearLeadBit();
+ }
+
+ /**
+ * Merge Kmer with the previous connected Kmer
+ * e.g. AACAACC merge with AAGCTAA, if the initial kmerSize = 3
+ * then it will return AAGCTAACAACC
+ *
+ * @param initialKmerSize
+ * : the initial kmerSize
+ * @param preKmer
+ * : the previous kmer
+ */
+ public void mergePreKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+ int preKmerLength = kmerlength;
+ int preSize = size;
+ this.kmerlength += preKmer.kmerlength - initialKmerSize + 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ byte cacheByte = getOneByteFromKmerAtPosition(0, bytes, offset, preSize);
+
+ // copy prekmer
+ for (int k = 0; k < preKmer.kmerlength - initialKmerSize + 1; k += 4) {
+ byte onebyte = getOneByteFromKmerAtPosition(k, preKmer.bytes, preKmer.offset, preKmer.size);
+ appendOneByteAtPosition(k, onebyte, bytes, offset, size);
+ }
+
+ // copy current kmer
+ int k = 4;
+ for (; k < preKmerLength; k += 4) {
+ byte onebyte = getOneByteFromKmerAtPosition(k, bytes, offset, preSize);
+ appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
+ cacheByte = onebyte;
+ }
+ appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
+ clearLeadBit();
+ }
+
+ public static void appendOneByteAtPosition(int k, byte onebyte, byte[] buffer, int start, int length) {
+ int position = start + length - 1 - k / 4;
+ if (position < start) {
+ throw new IllegalArgumentException("Buffer for kmer storage is invalid");
+ }
+ int shift = ((k) % 4) << 1;
+ int mask = shift == 0 ? 0 : ((1 << shift) - 1);
+
+ buffer[position] = (byte) ((buffer[position] & mask) | ((0xff & onebyte) << shift));
+ if (position > start && shift != 0) {
+ buffer[position - 1] = (byte) ((buffer[position - 1] & (0xff - mask)) | ((byte) ((0xff & onebyte) >> (8 - shift))));
+ }
+ }
+
+ public static byte getOneByteFromKmerAtPosition(int k, byte[] buffer, int start, int length) {
+ int position = start + length - 1 - k / 4;
+ if (position < start) {
+ throw new IllegalArgumentException("Buffer of kmer storage is invalid");
+ }
+ int shift = (k % 4) << 1;
+ byte data = (byte) (((0xff) & buffer[position]) >> shift);
+ if (shift != 0 && position > start) {
+ data |= 0xff & (buffer[position - 1] << (8 - shift));
+ }
+ return data;
+ }
+
+ protected void clearLeadBit() {
+ if (kmerlength % 4 != 0) {
+ bytes[offset] &= (1 << ((kmerlength % 4) << 1)) - 1;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.kmerlength = in.readInt();
+ this.size = KmerUtil.getByteNumFromK(kmerlength);
+ if (this.kmerlength > 0) {
+ if (this.bytes.length < this.size) {
+ this.bytes = new byte[this.size];
+ this.offset = 0;
+ }
+ in.readFully(bytes, offset, size);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(kmerlength);
+ if (kmerlength > 0) {
+ out.write(bytes, offset, size);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() * 31 + this.kmerlength;
+ }
+
+ @Override
+ public boolean equals(Object right_obj) {
+ if (right_obj instanceof KmerBytesWritable)
+ return this.kmerlength == ((KmerBytesWritable) right_obj).kmerlength && super.equals(right_obj);
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), offset, this.getLength());
+ }
+
+ public static class Comparator extends WritableComparator {
+ public final int LEAD_BYTES = 4;
+
+ public Comparator() {
+ super(KmerBytesWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int kmerlength1 = readInt(b1, s1);
+ int kmerlength2 = readInt(b2, s2);
+ if (kmerlength1 == kmerlength2) {
+ return compareBytes(b1, s1 + LEAD_BYTES, l1 - LEAD_BYTES, b2, s2 + LEAD_BYTES, l2 - LEAD_BYTES);
+ }
+ return kmerlength1 - kmerlength2;
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(KmerBytesWritable.class, new Comparator());
+ }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java
new file mode 100644
index 0000000..b0aaebc
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+public class KmerBytesWritableFactory {
+ private KmerBytesWritable kmer;
+
+ public KmerBytesWritableFactory(int k) {
+ kmer = new KmerBytesWritable(k);
+ }
+
+ /**
+ * Read Kmer from read text into bytes array e.g. AATAG will compress as
+ * [0x000G, 0xATAA]
+ *
+ * @param k
+ * @param array
+ * @param start
+ */
+ public KmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
+ kmer.reset(k);
+ kmer.setByRead(array, start);
+ return kmer;
+ }
+
+ /**
+ * Compress Reversed Kmer into bytes array AATAG will compress as
+ * [0x000A,0xATAG]
+ *
+ * @param array
+ * @param start
+ */
+ public KmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
+ kmer.reset(k);
+ kmer.setByReadReverse(array, start);
+ return kmer;
+ }
+
+ /**
+ * Get last kmer from kmer-chain.
+ * e.g. kmerChain is AAGCTA, if k =5, it will
+ * return AGCTA
+ *
+ * @param k
+ * @param kInChain
+ * @param kmerChain
+ * @return LastKmer bytes array
+ */
+ public KmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
+ if (lastK > kmerChain.getKmerLength()) {
+ return null;
+ }
+ if (lastK == kmerChain.getKmerLength()) {
+ kmer.set(kmerChain);
+ return kmer;
+ }
+ kmer.reset(lastK);
+
+ /** from end to start */
+ int byteInChain = kmerChain.getLength() - 1 - (kmerChain.getKmerLength() - lastK) / 4;
+ int posInByteOfChain = ((kmerChain.getKmerLength() - lastK) % 4) << 1; // *2
+ int byteInKmer = kmer.getLength() - 1;
+ for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
+ kmer.getBytes()[byteInKmer] = (byte) ((0xff & kmerChain.getBytes()[byteInChain]) >> posInByteOfChain);
+ kmer.getBytes()[byteInKmer] |= ((kmerChain.getBytes()[byteInChain - 1] << (8 - posInByteOfChain)));
+ }
+
+ /** last kmer byte */
+ if (byteInKmer == 0) {
+ kmer.getBytes()[0] = (byte) ((kmerChain.getBytes()[0] & 0xff) >> posInByteOfChain);
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ /**
+ * Get first kmer from kmer-chain e.g. kmerChain is AAGCTA, if k=5, it will
+ * return AAGCT
+ *
+ * @param k
+ * @param kInChain
+ * @param kmerChain
+ * @return FirstKmer bytes array
+ */
+ public KmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
+ if (firstK > kmerChain.getKmerLength()) {
+ return null;
+ }
+ if (firstK == kmerChain.getKmerLength()) {
+ kmer.set(kmerChain);
+ return kmer;
+ }
+ kmer.reset(firstK);
+
+ int i = 1;
+ for (; i < kmer.getLength(); i++) {
+ kmer.getBytes()[kmer.getLength() - i] = kmerChain.getBytes()[kmerChain.getLength() - i];
+ }
+ int posInByteOfChain = (firstK % 4) << 1; // *2
+ if (posInByteOfChain == 0) {
+ kmer.getBytes()[0] = kmerChain.getBytes()[kmerChain.getLength() - i];
+ } else {
+ kmer.getBytes()[0] = (byte) (kmerChain.getBytes()[kmerChain.getLength() - i] & ((1 << posInByteOfChain) - 1));
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ public KmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
+ if (startK + kSize > kmerChain.getKmerLength()) {
+ return null;
+ }
+ if (startK == 0 && kSize == kmerChain.getKmerLength()) {
+ kmer.set(kmerChain);
+ return kmer;
+ }
+ kmer.reset(kSize);
+
+ /** from end to start */
+ int byteInChain = kmerChain.getLength() - 1 - startK / 4;
+ int posInByteOfChain = startK % 4 << 1; // *2
+ int byteInKmer = kmer.getLength() - 1;
+ for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
+ kmer.getBytes()[byteInKmer] = (byte) ((0xff & kmerChain.getBytes()[byteInChain]) >> posInByteOfChain);
+ kmer.getBytes()[byteInKmer] |= ((kmerChain.getBytes()[byteInChain - 1] << (8 - posInByteOfChain)));
+ }
+
+ /** last kmer byte */
+ if (byteInKmer == 0) {
+ kmer.getBytes()[0] = (byte) ((kmerChain.getBytes()[0] & 0xff) >> posInByteOfChain);
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ /**
+ * Merge kmer with next neighbor in gene-code format.
+ * The k of new kmer will increase by 1
+ * e.g. AAGCT merge with A => AAGCTA
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param nextCode
+ * : next neighbor in gene-code format
+ * @return the merged Kmer, this K of this Kmer is k+1
+ */
+ public KmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
+ this.kmer.reset(kmer.getKmerLength() + 1);
+ for (int i = 1; i <= kmer.getLength(); i++) {
+ this.kmer.getBytes()[this.kmer.getLength() - i] = kmer.getBytes()[kmer.getLength() - i];
+ }
+ if (this.kmer.getLength() > kmer.getLength()) {
+ this.kmer.getBytes()[0] = (byte) (nextCode & 0x3);
+ } else {
+ this.kmer.getBytes()[0] = (byte) (kmer.getBytes()[0] | ((nextCode & 0x3) << ((kmer.getKmerLength() % 4) << 1)));
+ }
+ this.kmer.clearLeadBit();
+ return this.kmer;
+ }
+
+ /**
+ * Merge kmer with previous neighbor in gene-code format.
+ * The k of new kmer will increase by 1
+ * e.g. AAGCT merge with A => AAAGCT
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param preCode
+ * : next neighbor in gene-code format
+ * @return the merged Kmer,this K of this Kmer is k+1
+ */
+ public KmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+ this.kmer.reset(kmer.getKmerLength() + 1);
+ int byteInMergedKmer = 0;
+ if (kmer.getKmerLength() % 4 == 0) {
+ this.kmer.getBytes()[0] = (byte) ((kmer.getBytes()[0] >> 6) & 0x3);
+ byteInMergedKmer++;
+ }
+ for (int i = 0; i < kmer.getLength() - 1; i++, byteInMergedKmer++) {
+ this.kmer.getBytes()[byteInMergedKmer] = (byte) ((kmer.getBytes()[i] << 2) | ((kmer.getBytes()[i + 1] >> 6) & 0x3));
+ }
+ this.kmer.getBytes()[byteInMergedKmer] = (byte) ((kmer.getBytes()[kmer.getLength() - 1] << 2) | (preCode & 0x3));
+ this.kmer.clearLeadBit();
+ return this.kmer;
+ }
+
+ /**
+ * Merge two kmer to one kmer
+ * e.g. ACTA + ACCGT => ACTAACCGT
+ *
+ * @param preK
+ * : previous k of kmer
+ * @param kmerPre
+ * : bytes array of previous kmer
+ * @param nextK
+ * : next k of kmer
+ * @param kmerNext
+ * : bytes array of next kmer
+ * @return merged kmer, the new k is @preK + @nextK
+ */
+ public KmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
+ kmer.reset(preKmer.getKmerLength() + nextKmer.getKmerLength());
+ int i = 1;
+ for (; i <= preKmer.getLength(); i++) {
+ kmer.getBytes()[kmer.getLength() - i] = preKmer.getBytes()[preKmer.getLength() - i];
+ }
+ if (i > 1) {
+ i--;
+ }
+ if (preKmer.getKmerLength() % 4 == 0) {
+ for (int j = 1; j <= nextKmer.getLength(); j++) {
+ kmer.getBytes()[kmer.getLength() - i - j] = nextKmer.getBytes()[nextKmer.getLength() - j];
+ }
+ } else {
+ int posNeedToMove = ((preKmer.getKmerLength() % 4) << 1);
+ kmer.getBytes()[kmer.getLength() - i] |= nextKmer.getBytes()[nextKmer.getLength() - 1] << posNeedToMove;
+ for (int j = 1; j < nextKmer.getLength(); j++) {
+ kmer.getBytes()[kmer.getLength() - i - j] = (byte) (((nextKmer.getBytes()[nextKmer.getLength() - j] & 0xff) >> (8 - posNeedToMove)) | (nextKmer
+ .getBytes()[nextKmer.getLength() - j - 1] << posNeedToMove));
+ }
+ if (nextKmer.getKmerLength() % 4 == 0 || (nextKmer.getKmerLength() % 4) * 2 + posNeedToMove > 8) {
+ kmer.getBytes()[0] = (byte) ((0xff & nextKmer.getBytes()[0]) >> (8 - posNeedToMove));
+ }
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ /**
+ * Safely shifted the kmer forward without change the input kmer
+ * e.g. AGCGC shift with T => GCGCT
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param afterCode
+ * : input genecode
+ * @return new created kmer that shifted by afterCode, the K will not change
+ */
+ public KmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
+ this.kmer.set(kmer);
+ this.kmer.shiftKmerWithNextCode(afterCode);
+ return this.kmer;
+ }
+
+ /**
+ * Safely shifted the kmer backward without change the input kmer
+ * e.g. AGCGC shift with T => TAGCG
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param preCode
+ * : input genecode
+ * @return new created kmer that shifted by preCode, the K will not change
+ */
+ public KmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+ this.kmer.set(kmer);
+ this.kmer.shiftKmerWithPreCode(preCode);
+ return this.kmer;
+ }
+
+ /**
+ * get the reverse sequence of given kmer
+ *
+ * @param kmer
+ */
+ public KmerBytesWritable reverse(final KmerBytesWritable kmer) {
+ this.kmer.reset(kmer.getKmerLength());
+
+ int curPosAtKmer = ((kmer.getKmerLength() - 1) % 4) << 1;
+ int curByteAtKmer = 0;
+
+ int curPosAtReverse = 0;
+ int curByteAtReverse = this.kmer.getLength() - 1;
+ this.kmer.getBytes()[curByteAtReverse] = 0;
+ for (int i = 0; i < kmer.getKmerLength(); i++) {
+ byte gene = (byte) ((kmer.getBytes()[curByteAtKmer] >> curPosAtKmer) & 0x03);
+ this.kmer.getBytes()[curByteAtReverse] |= gene << curPosAtReverse;
+ curPosAtReverse += 2;
+ if (curPosAtReverse >= 8) {
+ curPosAtReverse = 0;
+ this.kmer.getBytes()[--curByteAtReverse] = 0;
+ }
+ curPosAtKmer -= 2;
+ if (curPosAtKmer < 0) {
+ curPosAtKmer = 6;
+ curByteAtKmer++;
+ }
+ }
+ this.kmer.clearLeadBit();
+ return this.kmer;
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
similarity index 75%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
index 3cb2216..128bf9f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.oldtype;
+package edu.uci.ics.genomix.velvet.oldtype;
import java.io.DataInput;
import java.io.DataOutput;
@@ -22,30 +22,17 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-
public class NodeWritable implements WritableComparable<NodeWritable>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
- public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
-
- // merge/update directions
- public static class DirectionFlag {
- public static final byte DIR_FF = 0b00 << 0;
- public static final byte DIR_FR = 0b01 << 0;
- public static final byte DIR_RF = 0b10 << 0;
- public static final byte DIR_RR = 0b11 << 0;
- public static final byte DIR_MASK = 0b11 << 0;
- }
-
private PositionWritable nodeID;
private PositionListWritable forwardForwardList;
private PositionListWritable forwardReverseList;
private PositionListWritable reverseForwardList;
private PositionListWritable reverseReverseList;
- private VKmerBytesWritable kmer;
+ private KmerBytesWritable kmer;
public NodeWritable() {
this(21);
@@ -57,7 +44,7 @@
forwardReverseList = new PositionListWritable();
reverseForwardList = new PositionListWritable();
reverseReverseList = new PositionListWritable();
- kmer = new VKmerBytesWritable();
+ kmer = new KmerBytesWritable(kmerSize);
}
public NodeWritable(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
@@ -68,17 +55,7 @@
forwardReverseList.set(FRList);
reverseForwardList.set(RFList);
reverseReverseList.set(RRList);
- kmer.setAsCopy(kmer);
- }
-
- public void set(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
- PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
- this.nodeID.set(nodeID);
- this.forwardForwardList.set(FFList);
- this.forwardReverseList.set(FRList);
- this.reverseForwardList.set(RFList);
- this.reverseReverseList.set(RRList);
- this.kmer.setAsCopy(kmer);
+ kmer.set(kmer);
}
public void setNodeID(PositionWritable ref) {
@@ -90,7 +67,7 @@
}
public void setKmer(KmerBytesWritable right) {
- this.kmer.setAsCopy(right);
+ this.kmer.set(right);
}
public void reset(int kmerSize) {
@@ -118,21 +95,6 @@
return reverseReverseList;
}
- public PositionListWritable getListFromDir(byte dir) {
- switch (dir & DirectionFlag.DIR_MASK) {
- case DirectionFlag.DIR_FF:
- return getFFList();
- case DirectionFlag.DIR_FR:
- return getFRList();
- case DirectionFlag.DIR_RF:
- return getRFList();
- case DirectionFlag.DIR_RR:
- return getRRList();
- default:
- throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
- }
- }
-
public PositionWritable getNodeID() {
return nodeID;
}
@@ -148,13 +110,13 @@
public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
this.forwardForwardList.set(nextNode.forwardForwardList);
this.forwardReverseList.set(nextNode.forwardReverseList);
- kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
+ kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
}
public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
this.reverseForwardList.set(preNode.reverseForwardList);
this.reverseReverseList.set(preNode.reverseReverseList);
- kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
+ kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
}
public void set(NodeWritable node) {
@@ -163,7 +125,7 @@
this.forwardReverseList.set(node.forwardReverseList);
this.reverseForwardList.set(node.reverseForwardList);
this.reverseReverseList.set(node.reverseReverseList);
- this.kmer.setAsCopy(node.kmer);
+ this.kmer.set(node.kmer);
}
@Override
@@ -211,13 +173,13 @@
@Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('{');
+ sbuilder.append('(');
sbuilder.append(nodeID.toString()).append('\t');
sbuilder.append(forwardForwardList.toString()).append('\t');
sbuilder.append(forwardReverseList.toString()).append('\t');
sbuilder.append(reverseForwardList.toString()).append('\t');
sbuilder.append(reverseReverseList.toString()).append('\t');
- sbuilder.append(kmer.toString()).append('}');
+ sbuilder.append(kmer.toString()).append(')');
return sbuilder.toString();
}
@@ -236,8 +198,4 @@
return inDegree() == 1 && outDegree() == 1;
}
- public boolean isSimpleOrTerminalPath() {
- return isPathNode() || (inDegree() == 0 && outDegree() == 1) || (inDegree() == 1 && outDegree() == 0);
- }
-
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java
new file mode 100644
index 0000000..b6c42c2
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionListWritable implements Writable, Iterable<PositionWritable>, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected byte[] storage;
+ protected int offset;
+ protected int valueCount;
+ protected static final byte[] EMPTY = {};
+ public static final int INTBYTES = 4;
+
+ protected PositionWritable posIter = new PositionWritable();
+
+ public PositionListWritable() {
+ this.storage = EMPTY;
+ this.valueCount = 0;
+ this.offset = 0;
+ }
+
+ public PositionListWritable(int count, byte[] data, int offset) {
+ setNewReference(count, data, offset);
+ }
+
+ public PositionListWritable(List<PositionWritable> posns) {
+ this();
+ for (PositionWritable p : posns) {
+ append(p);
+ }
+ }
+
+ public void setNewReference(int count, byte[] data, int offset) {
+ this.valueCount = count;
+ this.storage = data;
+ this.offset = offset;
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ }
+
+ protected int getCapacity() {
+ return storage.length - offset;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap > getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (storage.length - offset > 0) {
+ System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
+ }
+ storage = new_data;
+ offset = 0;
+ }
+ }
+
+ public PositionWritable getPosition(int i) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ posIter.setNewReference(storage, offset + i * PositionWritable.LENGTH);
+ return posIter;
+ }
+
+ public void resetPosition(int i, int readID, byte posInRead) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ Marshal.putInt(readID, storage, offset + i * PositionWritable.LENGTH);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
+ @Override
+ public Iterator<PositionWritable> iterator() {
+ Iterator<PositionWritable> it = new Iterator<PositionWritable>() {
+
+ private int currentIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < valueCount;
+ }
+
+ @Override
+ public PositionWritable next() {
+ return getPosition(currentIndex++);
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ return it;
+ }
+
+ public void set(PositionListWritable list2) {
+ set(list2.valueCount, list2.storage, list2.offset);
+ }
+
+ public void set(int valueCount, byte[] newData, int offset) {
+ this.valueCount = valueCount;
+ setSize(valueCount * PositionWritable.LENGTH);
+ if (valueCount > 0) {
+ System.arraycopy(newData, offset, storage, this.offset, valueCount * PositionWritable.LENGTH);
+ }
+ }
+
+ public void reset() {
+ valueCount = 0;
+ }
+
+ public void append(PositionWritable pos) {
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+ * PositionWritable.LENGTH, pos.getLength());
+ valueCount += 1;
+ }
+
+ public void append(int readID, byte posInRead) {
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ Marshal.putInt(readID, storage, offset + valueCount * PositionWritable.LENGTH);
+ storage[offset + valueCount * PositionWritable.LENGTH + PositionWritable.INTBYTES] = posInRead;
+ valueCount += 1;
+ }
+
+ public static int getCountByDataLength(int length) {
+ if (length % PositionWritable.LENGTH != 0) {
+ for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+ System.out.println(ste);
+ }
+ throw new IllegalArgumentException("Length of positionlist is invalid");
+ }
+ return length / PositionWritable.LENGTH;
+ }
+
+ public int getCountOfPosition() {
+ return valueCount;
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return valueCount * PositionWritable.LENGTH;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.valueCount = in.readInt();
+ setSize(valueCount * PositionWritable.LENGTH);
+ in.readFully(storage, offset, valueCount * PositionWritable.LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(valueCount);
+ out.write(storage, offset, valueCount * PositionWritable.LENGTH);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('[');
+ for (PositionWritable pos : this) {
+ sbuilder.append(pos.toString());
+ sbuilder.append(',');
+ }
+ if (valueCount > 0) {
+ sbuilder.setCharAt(sbuilder.length() - 1, ']');
+ } else {
+ sbuilder.append(']');
+ }
+ return sbuilder.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PositionListWritable))
+ return false;
+ PositionListWritable other = (PositionListWritable) o;
+ if (this.valueCount != other.valueCount)
+ return false;
+ for (int i=0; i < this.valueCount; i++) {
+ if (!this.getPosition(i).equals(other.getPosition(i)))
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java
new file mode 100644
index 0000000..1d509bb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionWritable implements WritableComparable<PositionWritable>, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected byte[] storage;
+ protected int offset;
+ public static final int LENGTH = 5;
+ public static final int INTBYTES = 4;
+
+ public PositionWritable() {
+ storage = new byte[LENGTH];
+ offset = 0;
+ }
+
+ public PositionWritable(int readID, byte posInRead) {
+ this();
+ set(readID, posInRead);
+ }
+
+ public PositionWritable(byte[] storage, int offset) {
+ setNewReference(storage, offset);
+ }
+
+ public void setNewReference(byte[] storage, int offset) {
+ this.storage = storage;
+ this.offset = offset;
+ }
+
+ public void set(PositionWritable pos) {
+ set(pos.getReadID(), pos.getPosInRead());
+ }
+
+ public void set(int readID, byte posInRead) {
+ Marshal.putInt(readID, storage, offset);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
+ public int getReadID() {
+ return Marshal.getInt(storage, offset);
+ }
+
+ public byte getPosInRead() {
+ return storage[offset + INTBYTES];
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return LENGTH;
+ }
+
+ public boolean isSameReadID(PositionWritable other) {
+ return getReadID() == other.getReadID();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ in.readFully(storage, offset, LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(storage, offset, LENGTH);
+ }
+
+ @Override
+ public int hashCode() {
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PositionWritable))
+ return false;
+ PositionWritable other = (PositionWritable) o;
+ return this.getReadID() == other.getReadID() && this.getPosInRead() == other.getPosInRead();
+ }
+
+ @Override
+ public int compareTo(PositionWritable other) {
+ int diff1 = this.getReadID() - other.getReadID();
+ if (diff1 == 0) {
+ int diff2 = Math.abs((int) this.getPosInRead()) - Math.abs((int) other.getPosInRead());
+ if (diff2 == 0) {
+ return this.getPosInRead() - other.getPosInRead();
+ }
+ return diff2;
+ }
+ return diff1;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Integer.toString(getReadID()) + "," + Integer.toString((int) getPosInRead()) + ")";
+ }
+
+ /** A Comparator optimized for IntWritable. */
+ public static class Comparator extends WritableComparator {
+ public Comparator() {
+ super(PositionWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int thisValue = Marshal.getInt(b1, s1);
+ int thatValue = Marshal.getInt(b2, s2);
+ int diff1 = thisValue - thatValue;
+ if (diff1 == 0) {
+ int diff2 = Math.abs((int) b1[s1 + INTBYTES]) - Math.abs((int) b2[s2 + INTBYTES]);
+ if (diff2 == 0) {
+ return b1[s1 + INTBYTES] - b2[s2 + INTBYTES];
+ }
+ return diff2;
+ }
+ return diff1;
+ }
+ }
+
+ public static class FirstComparator implements RawComparator<PositionWritable> {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1 - 1, b2, s2, l2 - 1);
+ }
+
+ @Override
+ public int compare(PositionWritable o1, PositionWritable o2) {
+ int l = o1.getReadID();
+ int r = o2.getReadID();
+ return l == r ? 0 : (l < r ? -1 : 1);
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(PositionWritable.class, new Comparator());
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
deleted file mode 100644
index d14405e..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
-import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private PositionListWritable nodeIdList = new PositionListWritable();
- private KmerListWritable forwardForwardList = new KmerListWritable(kmerSize);//怎么得到kmersize
- private KmerListWritable forwardReverseList = new KmerListWritable(kmerSize);
- private KmerListWritable reverseForwardList = new KmerListWritable(kmerSize);
- private KmerListWritable reverseReverseList = new KmerListWritable(kmerSize);
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- return new IAggregatorDescriptor() {
- private NodeWritable nodeAggreter = new NodeWritable();// 能否写在外面
-
- protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- return offset;
- }
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new NodeWritable());
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- NodeWritable inputVal = (NodeWritable) state.state;
- inputVal.reset(kmerSize);
- nodeIdList.reset();
- forwardForwardList.reset(kmerSize);
- forwardReverseList.reset(kmerSize);
- reverseForwardList.reset(kmerSize);
- reverseReverseList.reset(kmerSize);
-
- kmer.setAsCopy(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
- nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
- int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
- forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
- int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
- forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
- int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
- reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
- int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
- reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
- nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
-
- inputVal.getKmer().setAsCopy(kmer);
- inputVal.getNodeIdList().appendList(nodeIdList);
- inputVal.getFFList().appendList(forwardForwardList);
- inputVal.getFRList().appendList(forwardReverseList);
- inputVal.getRFList().appendList(reverseForwardList);
- inputVal.getRRList().appendList(reverseReverseList);
-
- // make an empty field
- tupleBuilder.addFieldEndOffset();///????为啥
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- NodeWritable inputVal = (NodeWritable) state.state;
- kmer.setAsCopy(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
- nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
- int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
- forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
- int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
- forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
- int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
- reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
- int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
- reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
- nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
-
- inputVal.getKmer().setAsCopy(kmer);
- inputVal.getNodeIdList().appendList(nodeIdList);
- inputVal.getFFList().appendList(forwardForwardList);
- inputVal.getFRList().appendList(forwardReverseList);
- inputVal.getRFList().appendList(reverseForwardList);
- inputVal.getRRList().appendList(reverseReverseList);
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- throw new IllegalStateException("partial result method should not be called");
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- NodeWritable inputVal = (NodeWritable) state.state;
- try {
-// fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
-
- tupleBuilder.addFieldEndOffset();// --------------为什么?
- //-------------------------------------------------------
-// tupleBuilder.reset();
- fieldOutput.write(inputVal.getKmer().getBytes(), inputVal.getKmer().getOffset(), inputVal.getKmer().getLength());
-
- tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
- .getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
- .getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
- .getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
- .getLength());
-
-/* if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record kmerByteSize is too large.");
- }
- }*/
-
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
- }
-
- };
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
deleted file mode 100644
index 76ca95a..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.net.URL;
-import java.util.EnumSet;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
-import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
-import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
-import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
-import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
-import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
-import edu.uci.ics.genomix.hyracks.job.JobGenUnMerged;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class Driver {
- public static enum Plan {
- CHECK_KMERREADER,
- OUTPUT_KMERHASHTABLE,
- OUTPUT_MAP_KMER_TO_READ,
- OUTPUT_GROUPBY_READID,
- BUILD_DEBRUJIN_GRAPH,
- BUILD_UNMERGED_GRAPH,
- }
-
- private static final String IS_PROFILING = "genomix.driver.profiling";
- private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
- private static final Log LOG = LogFactory.getLog(Driver.class);
- private JobGen jobGen;
- private boolean profiling;
-
- private int numPartitionPerMachine;
-
- private IHyracksClientConnection hcc;
- private Scheduler scheduler;
-
- public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
- try {
- hcc = new HyracksConnection(ipAddress, port);
- scheduler = new Scheduler(hcc.getNodeControllerInfos());
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- this.numPartitionPerMachine = numPartitionPerMachine;
- }
-
- public void runJob(GenomixJobConf job) throws HyracksException {
- runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
- }
-
- public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- job.addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- job.addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- job.addResource(hadoopHdfs);
-
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
-
- this.profiling = profiling;
- try {
- Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
- LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
- switch (planChoice) {
- case BUILD_DEBRUJIN_GRAPH:
- default:
- jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
- break;
- case OUTPUT_KMERHASHTABLE:
- jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
- break;
- case OUTPUT_MAP_KMER_TO_READ:
- jobGen = new JobGenMapKmerToRead(job, scheduler, ncMap, numPartitionPerMachine);
- break;
- case OUTPUT_GROUPBY_READID:
- jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
- break;
- case CHECK_KMERREADER:
- jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
- break;
- case BUILD_UNMERGED_GRAPH:
- jobGen = new JobGenUnMerged(job, scheduler, ncMap, numPartitionPerMachine);
- }
-
- start = System.currentTimeMillis();
- run(jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- LOG.info("job finished");
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- private void run(JobGen jobGen) throws Exception {
- try {
- JobSpecification createJob = jobGen.generateJob();
- execute(createJob);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- private void execute(JobSpecification job) throws Exception {
- job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc
- .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- }
-
- public static void main(String[] args) throws Exception {
- GenomixJobConf jobConf = new GenomixJobConf();
- String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
- if (otherArgs.length < 4) {
- System.err.println("Need <serverIP> <port> <input> <output>");
- System.exit(-1);
- }
- String ipAddress = otherArgs[0];
- int port = Integer.parseInt(otherArgs[1]);
- int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
- boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
- // FileInputFormat.setInputPaths(job, otherArgs[2]);
- {
- @SuppressWarnings("deprecation")
- Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
- jobConf.set("mapred.input.dir", path.toString());
-
- @SuppressWarnings("deprecation")
- Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
- jobConf.set("mapred.output.dir", outputDir.toString());
- }
- // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
- // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
- Driver driver = new Driver(ipAddress, port, numOfDuplicate);
- driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
deleted file mode 100644
index e280a7c..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
-import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
-import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
-import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-@SuppressWarnings("deprecation")
-public class JobGenBrujinGraph extends JobGen {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public enum GroupbyType {
- EXTERNAL,
- PRECLUSTER,
- HYBRIDHASH,
- }
-
- public enum OutputFormat {
- TEXT,
- BINARY,
- }
-
- protected ConfFactory hadoopJobConfFactory;
- protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- protected String[] ncNodeNames;
- protected String[] readSchedule;
-
- protected int readLength;
- protected int kmerSize;
- protected int frameLimits;
- protected int frameSize;
- protected int tableSize;
- protected GroupbyType groupbyType;
- protected OutputFormat outputFormat;
- protected boolean bGenerateReversedKmer;
-
- protected void logDebug(String status) {
- LOG.debug(status + " nc nodes:" + ncNodeNames.length);
- }
-
- public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) throws HyracksDataException {
- super(job);
- String[] nodes = new String[ncMap.size()];
- ncMap.keySet().toArray(nodes);
- ncNodeNames = new String[nodes.length * numPartitionPerMachine];
- for (int i = 0; i < numPartitionPerMachine; i++) {
- System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
- }
- initJobConfiguration(scheduler);
- }
-
- private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
- ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
- IPointableFactory pointable, RecordDescriptor outRed) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
- aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
- tableSize), true);
- }
-
- private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
- ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
- IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
- throws HyracksDataException {
-
- Object[] obj = new Object[3];
-
- switch (groupbyType) {
- case EXTERNAL:
- obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
- combineRed);
- obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
- obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
- finalRec);
- break;
- case PRECLUSTER:
- default:
-
- obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
- combineRed);
- obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
- obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
- finalRec);
- jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- break;
- }
- return obj;
- }
-
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
- try {
- InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
- .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
-
- return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
- hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
- kmerSize, bGenerateReversedKmer));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
- IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
- jobSpec.connect(conn, preOp, 0, nextOp, 0);
- }
-
- public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
- AbstractOperatorDescriptor readOperator) throws HyracksDataException {
- int[] keyFields = new int[] { 0 }; // the id of grouped key
-
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- ReadsKeyValueParserFactory.readKmerOutputRec);
- connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
- jobSpec));
-
- RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null, null, null, null});
- jobSpec.setFrameSize(frameSize);
-
- Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
- new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
- new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
- AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
- logDebug("LocalKmerGroupby Operator");
- connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
-
- logDebug("CrossKmerGroupby Operator");
- IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
- AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
- connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
- return kmerCrossAggregator;
- }
-
-/* public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
- AbstractOperatorDescriptor kmerCrossAggregator) {
- // Map (Kmer, {(ReadID,PosInRead),...}) into
- // (ReadID,PosInRead,{OtherPosition,...},Kmer)
-
- AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
- MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
- connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
- return mapKmerToRead;
- }*/
-
-/* public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
- AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
- int[] keyFields = new int[] { 0 }; // the id of grouped key
- // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- MapKmerPositionToReadOperator.readIDOutputRec);
- connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
- jobSpec));
-
- RecordDescriptor readIDFinalRec = new RecordDescriptor(
- new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
- Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
- new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
- IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
- AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
- connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
-
- logDebug("Group by ReadID merger");
- IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
- AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
- connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
- return readCrossAggregator;
- }*/
-
-/* public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
- AbstractOperatorDescriptor readCrossAggregator) {
- // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
- // OutgoingList, Kmer)
-
- AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
- MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
- connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
- return mapEachReadToNode;
- }
-
- public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
- AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
- // Output Kmer
- ITupleWriterFactory kmerWriter = null;
- switch (outputFormat) {
- case TEXT:
- kmerWriter = new KMerTextWriterFactory(kmerSize);
- break;
- case BINARY:
- default:
- kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
- break;
- }
- logDebug("WriteOperator");
- HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
- hadoopJobConfFactory.getConf(), kmerWriter);
- connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
- return writeKmerOperator;
- }
-
- public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
- AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
- ITupleWriterFactory nodeWriter = null;
- switch (outputFormat) {
- case TEXT:
- nodeWriter = new NodeTextWriterFactory(kmerSize);
- break;
- case BINARY:
- default:
- nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
- break;
- }
- logDebug("WriteOperator");
- // Output Node
- HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
- hadoopJobConfFactory.getConf(), nodeWriter);
- connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
- return writeNodeOperator;
- }
-*/
- @Override
- public JobSpecification generateJob() throws HyracksException {
-
- JobSpecification jobSpec = new JobSpecification();
- logDebug("ReadKmer Operator");
-
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
- logDebug("Group by Kmer");
- AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
-
- // logDebug("Write kmer to result");
- // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
-
-// logDebug("Map Kmer to Read Operator");
-// lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
-
-// logDebug("Group by Read Operator");
-// lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
-
-/* logDebug("Generate final node");
- lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
- logDebug("Write node to result");
- lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);*/
-
- jobSpec.addRoot(lastOperator);
- return jobSpec;
- }
-
- protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
- Configuration conf = confFactory.getConf();
- readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
- kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
- if (kmerSize % 2 == 0) {
- kmerSize--;
- conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
- }
- frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
- tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
- frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
-
- bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
-
- String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
- groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
- groupbyType = GroupbyType.PRECLUSTER;
- } else {
- groupbyType = GroupbyType.HYBRIDHASH;
- }
-
- String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
- if (output.equalsIgnoreCase("text")) {
- outputFormat = OutputFormat.TEXT;
- } else {
- outputFormat = OutputFormat.BINARY;
- }
- try {
- hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
- InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
- .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
- readSchedule = scheduler.getLocationConstraints(splits);
- } catch (IOException ex) {
- throw new HyracksDataException(ex);
- }
-
- LOG.info("Genomix Graph Build Configuration");
- LOG.info("Kmer:" + kmerSize);
- LOG.info("Groupby type:" + type);
- LOG.info("Output format:" + output);
- LOG.info("Frame limit" + frameLimits);
- LOG.info("Frame kmerByteSize" + frameSize);
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
deleted file mode 100644
index 3adf8a0..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.nio.ByteBuffer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
- private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
-
- public static final int OutputKmerField = 0;
- public static final int OutputPosition = 1;
-
- private final boolean bReversed;
- private final int readLength;
- private final int kmerSize;
-
- public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null, null, null, null, null, null});
-
- public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
- bReversed = bGenerateReversed;
- this.readLength = readlength;
- this.kmerSize = k;
- }
-
- public static enum KmerDir {
- FORWARD,
- REVERSE,
- }
-
- @Override
- public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- return new IKeyValueParser<LongWritable, Text>() {
-
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);//
- private PositionReference pos = new PositionReference();//
-
- private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
- private IntermediateNodeWritable outputNode = new IntermediateNodeWritable();
- private ReadIDWritable readId = new ReadIDWritable();
- private KmerListWritable kmerList = new KmerListWritable();
-
- private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(kmerSize);
- private KmerDir preKmerDir = KmerDir.FORWARD;
- private KmerDir curKmerDir = KmerDir.FORWARD;
- private KmerDir nextKmerDir = KmerDir.FORWARD;
-
- byte mateId = (byte) 0;
-
- @Override
- public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
- String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
- if (geneLine.length != 2) {
- return;
- }
- int readID = 0;
- try {
- readID = Integer.parseInt(geneLine[0]);
- } catch (NumberFormatException e) {
- LOG.warn("Invalid data ");
- return;
- }
-
- Pattern genePattern = Pattern.compile("[AGCT]+");
- Matcher geneMatcher = genePattern.matcher(geneLine[1]);
- boolean isValid = geneMatcher.matches();
- if (isValid) {
- if (geneLine[1].length() != readLength) {
- LOG.warn("Invalid readlength at: " + readID);
- return;
- }
- SplitReads(readID, geneLine[1].getBytes(), writer);
- }
- }
-
- private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
-/*
- if (kmerSize >= array.length) {
- return;
- }
- kmer.setByRead(array, 0);
- InsertToFrame(kmer, readID, 1, writer);
-
-
- for (int i = kmerSize; i < array.length; i++) {
- kmer.shiftKmerWithNextChar(array[i]);
- InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
- }
-
- if (bReversed) {
- kmer.setByReadReverse(array, 0);
- InsertToFrame(kmer, readID, -1, writer);
- for (int i = kmerSize; i < array.length; i++) {
- kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
- InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
- }
- }*/
- ///////////////////////////////////////
- if (kmerSize >= array.length) {
- return;
- }
- /** first kmer **/
- curForwardKmer.setByRead(array, 0);
- curReverseKmer.setAsCopy(kmerFactory.reverse(curForwardKmer));
- curKmerDir = curForwardKmer.compareTo(curReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
- setNextKmer(array[kmerSize]);
- readId.setAsCopy(mateId, readID);
- outputNode.setreadId(readId);
- setEdgeListForNextKmer();
- switch (curKmerDir) {
- case FORWARD:
- InsertToFrame(curForwardKmer, outputNode, writer);
- break;
- case REVERSE:
- InsertToFrame(curReverseKmer, outputNode, writer);
- break;
- }
- /** middle kmer **/
- for (int i = kmerSize + 1; i < array.length; i++) {
- setPreKmerByOldCurKmer();
- setCurKmerByOldNextKmer();
- setNextKmer(array[i]);
- //set value.readId
- readId.setAsCopy(mateId, readID);
- outputNode.setreadId(readId);
- //set value.edgeList
- setEdgeListForPreKmer();
- setEdgeListForNextKmer();
- //output mapper result
- switch (curKmerDir) {
- case FORWARD:
- InsertToFrame(curForwardKmer, outputNode, writer);
- break;
- case REVERSE:
- InsertToFrame(curReverseKmer, outputNode, writer);
- break;
- }
- }
- /** last kmer **/
- setPreKmerByOldCurKmer();
- setCurKmerByOldNextKmer();
- //set value.readId
- readId.setAsCopy(mateId, readID);
- outputNode.setreadId(readId);
- //set value.edgeList
- setEdgeListForPreKmer();
- //output mapper result
- switch (curKmerDir) {
- case FORWARD:
- InsertToFrame(curForwardKmer, outputNode, writer);
- break;
- case REVERSE:
- InsertToFrame(curReverseKmer, outputNode, writer);
- break;
- }
- }
-
- public void setPreKmer(byte preChar){
- preForwardKmer.setAsCopy(curForwardKmer);
- preForwardKmer.shiftKmerWithPreChar(preChar);
- preReverseKmer.setAsCopy(preForwardKmer);
- preReverseKmer.setAsCopy(kmerFactory.reverse(nextForwardKmer));
- preKmerDir = preForwardKmer.compareTo(preReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
- }
-
- public void setNextKmer(byte nextChar) {
- nextForwardKmer.setAsCopy(curForwardKmer);
- nextForwardKmer.shiftKmerWithNextChar(nextChar);
- nextReverseKmer.setAsCopy(nextForwardKmer);
- nextReverseKmer.setAsCopy(kmerFactory.reverse(nextForwardKmer));
- nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
- }
-
- public void setPreKmerByOldCurKmer() {
- preKmerDir = curKmerDir;
- preForwardKmer.setAsCopy(curForwardKmer);
- preReverseKmer.setAsCopy(curReverseKmer);
- }
-
- //old nextKmer becomes current curKmer
- public void setCurKmerByOldNextKmer() {
- curKmerDir = nextKmerDir;
- curForwardKmer.setAsCopy(nextForwardKmer);
- preReverseKmer.setAsCopy(nextReverseKmer);
- }
-
- public void setEdgeListForNextKmer() {
- switch (curKmerDir) {
- case FORWARD:
- switch (nextKmerDir) {
- case FORWARD:
- kmerList.reset();
- kmerList.append(nextForwardKmer);
- outputNode.setFFList(kmerList);
- break;
- case REVERSE:
- kmerList.reset();
- kmerList.append(nextReverseKmer);
- outputNode.setFRList(kmerList);
- break;
- }
- break;
- case REVERSE:
- switch (nextKmerDir) {
- case FORWARD:
- kmerList.reset();
- kmerList.append(nextForwardKmer);
- outputNode.setRFList(kmerList);
- break;
- case REVERSE:
- kmerList.reset();
- kmerList.append(nextReverseKmer);
- outputNode.setRRList(kmerList);
- break;
- }
- break;
- }
- }
-
- public void setEdgeListForPreKmer() {
- switch (curKmerDir) {
- case FORWARD:
- switch (preKmerDir) {
- case FORWARD:
- kmerList.reset();
- kmerList.append(preForwardKmer);
- outputNode.setRRList(kmerList);
- break;
- case REVERSE:
- kmerList.reset();
- kmerList.append(preReverseKmer);
- outputNode.setRFList(kmerList);
- break;
- }
- break;
- case REVERSE:
- switch (preKmerDir) {
- case FORWARD:
- kmerList.reset();
- kmerList.append(nextForwardKmer);
- outputNode.setFRList(kmerList);
- break;
- case REVERSE:
- kmerList.reset();
- kmerList.append(nextReverseKmer);
- outputNode.setFFList(kmerList);
- break;
- }
- break;
- }
- }
-
- private void InsertToFrame(KmerBytesWritable kmer, IntermediateNodeWritable Node, IFrameWriter writer) {
- try {
- tupleBuilder.reset();
- tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
-
- tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
- .getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
- .getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
- .getLength());
-
- tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
- .getLength());
-
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record kmerByteSize is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void open(IFrameWriter writer) throws HyracksDataException {
- }
-
- @Override
- public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- };
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 8f7a69e..60c0682 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -15,7 +15,7 @@
package edu.uci.ics.genomix.hyracks.data.primitive;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
public class NodeReference extends NodeWritable {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index f376f71..1827651 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -19,9 +19,11 @@
import java.nio.ByteBuffer;
import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -186,7 +188,7 @@
throw new IllegalArgumentException("kmer kmerByteSize is invalid");
}
offset += INT_LENGTH;
- kmer.setAsCopy(buffer.array(), offset);
+ kmer.set(buffer.array(), offset);
}
private void connect(NodeReference curNode, NodeReference nextNode) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 9aea9ad..2134177 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -25,8 +25,8 @@
import org.apache.hadoop.io.Text;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.GeneCode;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index aabdb3f..def046b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -25,9 +25,9 @@
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -70,7 +70,7 @@
if (reEnterKey.getLength() > tuple.getFieldLength(InputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
- reEnterKey.setAsReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
+ reEnterKey.setNewReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
int countOfPos = tuple.getFieldLength(InputPositionListField) / PositionWritable.LENGTH;
if (tuple.getFieldLength(InputPositionListField) % PositionWritable.LENGTH != 0) {
throw new IllegalArgumentException("Invalid count of position byte");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index 91f10be..652a6f2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -17,9 +17,9 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -49,7 +49,7 @@
if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
- kmer.setAsReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
+ kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField)
/ PositionWritable.LENGTH;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 1651d4a..e116ab9 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -27,8 +27,8 @@
import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -94,7 +94,7 @@
node.getRRList().setNewReference(tuple.getFieldLength(InputRRField) / PositionWritable.LENGTH,
tuple.getFieldData(InputRRField), tuple.getFieldStart(InputRRField));
- node.getKmer().setAsReference(
+ node.getKmer().setNewReference(
Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
tuple.getFieldData(InputKmerBytesField), tuple.getFieldStart(InputKmerBytesField));
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index 3ca4a9d..bc00aa5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -18,8 +18,8 @@
import java.io.IOException;
import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -69,7 +69,7 @@
tuple.getFieldData(NodeSequenceWriterFactory.InputRRField),
tuple.getFieldStart(NodeSequenceWriterFactory.InputRRField));
- node.getKmer().setAsReference(
+ node.getKmer().setNewReference(
Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index 07d16f8..b4b1e73 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -19,8 +19,8 @@
import java.util.Map;
import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -89,7 +89,7 @@
.getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
- kmer.setAsReference(
+ kmer.setNewReference(
tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
pos.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputPosition),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index fa5360c..1e78b79 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -19,8 +19,8 @@
import java.util.Map;
import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -121,7 +121,7 @@
throw new IllegalArgumentException("kmerlength is invalid");
}
fieldOffset += 4;
- kmer.setAsReference(buffer, fieldOffset);
+ kmer.setNewReference(buffer, fieldOffset);
fieldOffset += kmer.getLength();
kmerString = kmer.toString();
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 09b1680..8e727959 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -20,8 +20,8 @@
import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -93,7 +93,7 @@
.getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
- kmer.setAsReference(
+ kmer.setNewReference(
tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
kmerString = kmer.toString();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..6919e76
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+/**
+ * used by precluster groupby
+ */
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 22b2ae7..84f9fe0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -29,6 +29,7 @@
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
+
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -45,21 +46,14 @@
private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
public static final int OutputKmerField = 0;
- public static final int outputNodeIdListField = 1;
- public static final int OutputForwardForwardField = 2;
- public static final int OutputFFListCountField = 3;
- public static final int OutputForwardReverseField = 4;
- public static final int OutputFRListCountField = 5;
- public static final int OutputReverseForwardField = 6;
- public static final int OutputRFListCountField = 7;
- public static final int OutputReverseReverseField = 8;
- public static final int OutputRRListCountField = 9;
+ public static final int OutputNodeField = 1;
+
private final int readLength;
private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null, null, null, null, null, null, null, null});
+ null});
public ReadsKeyValueParserFactory(int readlength, int k) {
this.readLength = readlength;
@@ -77,21 +71,22 @@
final ByteBuffer outputBuffer = ctx.allocateFrame();
final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputAppender.reset(outputBuffer, true);
-
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new IKeyValueParser<LongWritable, Text>() {
-
+
private PositionWritable nodeId = new PositionWritable();
private PositionListWritable nodeIdList = new PositionListWritable();
- private KmerListWritable edgeListForPreKmer = new KmerListWritable(kmerSize);;
- private KmerListWritable edgeListForNextKmer = new KmerListWritable(kmerSize);;
- private NodeWritable outputNode = new NodeWritable(kmerSize);
+ private KmerListWritable edgeListForPreKmer = new KmerListWritable();
+ private KmerListWritable edgeListForNextKmer = new KmerListWritable();
+ private NodeWritable outputNode = new NodeWritable();
+// private NodeWritable outputNode2 = new NodeWritable();
- private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable preForwardKmer = new KmerBytesWritable();
+ private KmerBytesWritable preReverseKmer = new KmerBytesWritable();
+ private KmerBytesWritable curForwardKmer = new KmerBytesWritable();
+ private KmerBytesWritable curReverseKmer = new KmerBytesWritable();
+ private KmerBytesWritable nextForwardKmer = new KmerBytesWritable();
+ private KmerBytesWritable nextReverseKmer = new KmerBytesWritable();
private KmerDir preKmerDir = KmerDir.FORWARD;
private KmerDir curKmerDir = KmerDir.FORWARD;
@@ -126,37 +121,37 @@
}
private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
- /** first kmer */
+ /*first kmer*/
if (kmerSize >= array.length) {
return;
}
- outputNode.reset(kmerSize);
+ outputNode.reset();
curForwardKmer.setByRead(array, 0);
curReverseKmer.setByReadReverse(array, 0);
curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
setNextKmer(array[kmerSize]);
- setnodeId(mateId, readID, 1);
+ setnodeId(mateId, readID, 0);
setEdgeListForNextKmer();
writeToFrame(writer);
- /** middle kmer */
+ /*middle kmer*/
int i = kmerSize;
for (; i < array.length - 1; i++) {
- outputNode.reset(kmerSize);
+ outputNode.reset();
setPreKmerByOldCurKmer();
setCurKmerByOldNextKmer();
setNextKmer(array[i]);
- setnodeId(mateId, readID, i - kmerSize + 1);
+ setnodeId(mateId, readID, 0);//i - kmerSize + 1
setEdgeListForPreKmer();
setEdgeListForNextKmer();
writeToFrame(writer);
}
- /** last kmer */
- outputNode.reset(kmerSize);
+ /*last kmer*/
+ outputNode.reset();
setPreKmerByOldCurKmer();
setCurKmerByOldNextKmer();
- setnodeId(mateId, readID, array.length - kmerSize + 1);
+ setnodeId(mateId, readID, 0);//array.length - kmerSize + 1
setEdgeListForPreKmer();
writeToFrame(writer);
}
@@ -202,12 +197,12 @@
case FORWARD:
switch(preKmerDir){
case FORWARD:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preForwardKmer);
outputNode.setRRList(edgeListForPreKmer);
break;
case REVERSE:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preReverseKmer);
outputNode.setRFList(edgeListForPreKmer);
break;
@@ -216,12 +211,12 @@
case REVERSE:
switch(preKmerDir){
case FORWARD:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preForwardKmer);
outputNode.setFRList(edgeListForPreKmer);
break;
case REVERSE:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preReverseKmer);
outputNode.setFFList(edgeListForPreKmer);
break;
@@ -235,12 +230,12 @@
case FORWARD:
switch(nextKmerDir){
case FORWARD:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextForwardKmer);
outputNode.setFFList(edgeListForNextKmer);
break;
case REVERSE:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextReverseKmer);
outputNode.setFRList(edgeListForNextKmer);
break;
@@ -249,12 +244,12 @@
case REVERSE:
switch(nextKmerDir){
case FORWARD:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextForwardKmer);
outputNode.setRFList(edgeListForNextKmer);
break;
case REVERSE:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextReverseKmer);
outputNode.setRRList(edgeListForNextKmer);
break;
@@ -267,30 +262,7 @@
try {
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
-
- //tupleBuilder.addField(node.getnodeId().getByteArray(), node.getnodeId().getStartOffset(), node.getnodeId().getLength());
-// tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
-// tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
-// tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
-// tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
-
- tupleBuilder.addField(node.getNodeIdList().getByteArray(), node.getNodeIdList().getStartOffset(), node.getNodeIdList().getLength());
-
- tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getFFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getFRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getRFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getRRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
+ tupleBuilder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..46fdd0e
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ * local Aggregate
+ */
+ private static final long serialVersionUID = 1L;
+ private final int kmerSize;
+
+ public AggregateKmerAggregateFactory(int k) {
+ this.kmerSize = k;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
+ return new IAggregatorDescriptor() {
+
+ private NodeWritable readNode = new NodeWritable();
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new NodeWritable());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ localUniNode.reset();
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
+
+ // make an empty field
+ tupleBuilder.addFieldEndOffset();// mark question?
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ try {
+ fieldOutput.write(localUniNode.marshalToByteArray(), 0, localUniNode.getSerializedLength());
+
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
similarity index 60%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index f0f72b9..1ee6cae 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -13,20 +13,19 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.hyracks.contrail.graph;
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -35,32 +34,47 @@
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
-
+
+ private final int kmerSize;
+
+ public MergeKmerAggregateFactory(int k) {
+ this.kmerSize = k;
+ }
+
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
final int frameSize = ctx.getFrameSize();
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new IAggregatorDescriptor() {
- private PositionReference position = new PositionReference();
-
+ private NodeWritable readNode = new NodeWritable();
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
@Override
public AggregateState createAggregateStates() {
- return new AggregateState(new ArrayBackedValueStorage());
+ return new AggregateState(new NodeWritable());
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- inputVal.reset();
- int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
- for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
- 1); offset += PositionReference.LENGTH) {
- position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(position);
- }
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ localUniNode.reset();
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+ localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+ localUniNode.getFFList().unionUpdate(readNode.getFFList());
+ localUniNode.getFRList().unionUpdate(readNode.getFRList());
+ localUniNode.getRFList().unionUpdate(readNode.getRFList());
+ localUniNode.getRRList().unionUpdate(readNode.getRRList());
+
//make a fake feild to cheat caller
tupleBuilder.addFieldEndOffset();
}
@@ -73,13 +87,13 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
- for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
- 1); offset += PositionReference.LENGTH) {
- position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- inputVal.append(position);
- }
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+ localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+ localUniNode.getFFList().unionUpdate(readNode.getFFList());
+ localUniNode.getFRList().unionUpdate(readNode.getFRList());
+ localUniNode.getRFList().unionUpdate(readNode.getRFList());
+ localUniNode.getRRList().unionUpdate(readNode.getRRList());
}
@Override
@@ -92,12 +106,12 @@
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput fieldOutput = tupleBuilder.getDataOutput();
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ NodeWritable localUniNode = (NodeWritable) state.state;
try {
- if (inputVal.getLength() > frameSize / 2) {
- LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
+ if (localUniNode.getSerializedLength() > frameSize / 2) {
+ LOG.warn("MergeKmer: output data kmerByteSize is too big: " + localUniNode.getSerializedLength());
}
- fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ fieldOutput.write(localUniNode.marshalToByteArray(), 0, localUniNode.getSerializedLength());
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
index 6d6e1e6..4602ed2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.hyracks.newgraph.driver;
import java.net.URL;
@@ -6,10 +21,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGen;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenBrujinGraph;
import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenCheckReader;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -23,19 +40,21 @@
public class Driver {
public static enum Plan {
+ BUILD_DEBRUJIN_GRAPH,
CHECK_KMERREADER,
}
-
+
private static final String IS_PROFILING = "genomix.driver.profiling";
private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
private static final Log LOG = LogFactory.getLog(Driver.class);
private JobGen jobGen;
private boolean profiling;
+
private int numPartitionPerMachine;
private IHyracksClientConnection hcc;
private Scheduler scheduler;
-
+
public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
try {
hcc = new HyracksConnection(ipAddress, port);
@@ -45,9 +64,9 @@
}
this.numPartitionPerMachine = numPartitionPerMachine;
}
-
+
public void runJob(GenomixJobConf job) throws HyracksException {
- runJob(job, Plan.CHECK_KMERREADER, false);
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
}
public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
@@ -69,8 +88,11 @@
Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
switch (planChoice) {
- case CHECK_KMERREADER:
+ case BUILD_DEBRUJIN_GRAPH:
default:
+ jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ case CHECK_KMERREADER:
jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
break;
}
@@ -85,7 +107,7 @@
throw new HyracksException(e);
}
}
-
+
private void run(JobGen jobGen) throws Exception {
try {
JobSpecification createJob = jobGen.generateJob();
@@ -95,11 +117,37 @@
throw e;
}
}
-
+
private void execute(JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc
- .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc.startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
+
+ public static void main(String[] args) throws Exception {
+ GenomixJobConf jobConf = new GenomixJobConf();
+ String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+ if (otherArgs.length < 4) {
+ System.err.println("Need <serverIP> <port> <input> <output>");
+ System.exit(-1);
+ }
+ String ipAddress = otherArgs[0];
+ int port = Integer.parseInt(otherArgs[1]);
+ int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+ // FileInputFormat.setInputPaths(job, otherArgs[2]);
+ {
+ @SuppressWarnings("deprecation")
+ Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+ jobConf.set("mapred.input.dir", path.toString());
+
+ @SuppressWarnings("deprecation")
+ Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+ jobConf.set("mapred.output.dir", outputDir.toString());
+ }
+ // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+ // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+ Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+ driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
new file mode 100644
index 0000000..b7b7054
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ * Write the node to Text
+ */
+ private static final long serialVersionUID = 1L;
+ private final int kmerSize;
+ public static final int OutputKmerField = ReadsKeyValueParserFactory.OutputKmerField;
+ public static final int outputNodeField = ReadsKeyValueParserFactory.OutputNodeField;
+
+ public NodeTextWriterFactory(int k) {
+ this.kmerSize = k;
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
+ return new ITupleWriter() {
+ NodeWritable node = new NodeWritable();
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ node.setAsReference(tuple.getFieldData(outputNodeField), tuple.getFieldStart(outputNodeField));
+ node.getKmer().reset(kmerSize);
+ node.getKmer().setAsReference(tuple.getFieldData(OutputKmerField), tuple.getFieldStart(OutputKmerField));
+ try {
+ output.write(node.toString().getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/GenomixJobConf.java
similarity index 98%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/GenomixJobConf.java
index ecd95d5..b0edf77 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/GenomixJobConf.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.hyracks.contrail.graph;
+package edu.uci.ics.genomix.hyracks.newgraph.job;
import java.io.IOException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
similarity index 94%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
index 0ffdef2..9649566 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGen.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.hyracks.contrail.graph;
+package edu.uci.ics.genomix.hyracks.newgraph.job;
import java.io.Serializable;
import java.util.UUID;
@@ -26,7 +26,7 @@
public abstract class JobGen implements Serializable {
/**
- *
+ * generate the jobId
*/
private static final long serialVersionUID = 1L;
protected final ConfFactory confFactory;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index abfff00..afc1cf7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -24,18 +24,39 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.io.NodeTextWriterFactory;
+
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@SuppressWarnings("deprecation")
@@ -68,7 +89,7 @@
protected int tableSize;
protected GroupbyType groupbyType;
protected OutputFormat outputFormat;
- protected boolean bGenerateReversedKmer;
+
protected void logDebug(String status) {
LOG.debug(status + " nc nodes:" + ncNodeNames.length);
@@ -85,7 +106,116 @@
}
initJobConfiguration(scheduler);
}
-
+
+ private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+ throws HyracksDataException {
+
+ Object[] obj = new Object[3];
+
+ switch (groupbyType) {
+ case PRECLUSTER:
+ default:
+ obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
+ combineRed);
+ obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+ obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+ finalRec);
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ break;
+ }
+ return obj;
+ }
+
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+ return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+ hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+ kmerSize));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+ IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+ jobSpec.connect(conn, preOp, 0, nextOp, 0);
+ }
+
+ public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ ReadsKeyValueParserFactory.readKmerOutputRec);
+
+ connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
+
+ RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ jobSpec.setFrameSize(frameSize);
+
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(kmerSize),
+ new MergeKmerAggregateFactory(kmerSize), new KmerHashPartitioncomputerFactory(),
+ new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+ AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ logDebug("LocalKmerGroupby Operator");
+ connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("CrossKmerGroupby Operator");
+ IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+ return kmerCrossAggregator;
+ }
+
+ public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+ ITupleWriterFactory nodeWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ nodeWriter = new NodeTextWriterFactory(kmerSize);
+ break;
+ }
+ logDebug("WriteOperator");
+ // Output Node
+ HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), nodeWriter);
+ connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return writeNodeOperator;
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Write node to result");
+ lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
+
+ jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
+ return jobSpec;
+ }
+
protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
Configuration conf = confFactory.getConf();
readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
@@ -98,18 +228,11 @@
tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
- bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
-
String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
- if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
- groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
- groupbyType = GroupbyType.PRECLUSTER;
- } else {
- groupbyType = GroupbyType.HYBRIDHASH;
- }
+ groupbyType = GroupbyType.PRECLUSTER;
- String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+
if (output.equalsIgnoreCase("text")) {
outputFormat = OutputFormat.TEXT;
} else {
@@ -131,30 +254,5 @@
LOG.info("Frame limit" + frameLimits);
LOG.info("Frame kmerByteSize" + frameSize);
}
-
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
- try {
- InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
- .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
- return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
- hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
- kmerSize, bGenerateReversedKmer));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
- IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
- jobSpec.connect(conn, preOp, 0, nextOp, 0);
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
- // TODO Auto-generated method stub
- return null;
- }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
index 6d64cfd..f512f43 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
@@ -18,12 +18,11 @@
import java.io.IOException;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
+
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,12 +33,16 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class JobGenCheckReader extends JobGenBrujinGraph {
+ /**
+ *
+ */
private static final long serialVersionUID = 1L;
public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
@@ -62,7 +65,7 @@
public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
HDFSReadOperatorDescriptor readOperator) throws HyracksException {
- // Output Kmer
+
HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
@@ -70,11 +73,11 @@
@Override
public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new ITupleWriter() {
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
- private KmerListWritable kmerList = new KmerListWritable();
- //private IntermediateNodeWritable intermediateNode = new IntermediateNodeWritable();
+ private NodeWritable outputNode = new NodeWritable();
+ private KmerBytesWritable outputKmer = new KmerBytesWritable();
@Override
public void open(DataOutput output) throws HyracksDataException {
@@ -83,36 +86,20 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
try {
- if (kmer.getLength() > tuple
+ if (outputKmer.getLength() > tuple
.getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
throw new IllegalArgumentException("Not enough kmer bytes");
}
- //kemr
- kmer.setAsReference(
+ outputKmer.setAsReference(
tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
- kmerList.setAsReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputNodeIdField),
- tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeIdField),
- tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeIdField));
-// //nodeId
-// intermediateNode.getNodeId().setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeIdField),
-// tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeIdField));
- //FF list
-// intermediateNode.getFFList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputForwardForwardField) / 2 ,
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputForwardForwardField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputForwardForwardField));
-// //FR list
-// intermediateNode.getFRList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputForwardReverseField / kmer.getLength()),
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputForwardReverseField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputForwardReverseField));
-// //RF list
-// intermediateNode.getRFList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputReverseForwardField / kmer.getLength()),
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputReverseForwardField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputReverseForwardField));
-// //RR list
-// intermediateNode.getRRList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputReverseReverseField / kmer.getLength()),
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputReverseReverseField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputReverseReverseField));
-//
- output.write(kmer.toString().getBytes());
+ outputNode.setAsReference(
+ tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeField),
+ tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeField));
+
+ output.write(outputKmer.toString().getBytes());
output.writeByte('\t');
- output.write(kmerList.toString().getBytes());
+ output.write(outputNode.toString().getBytes());
output.writeByte('\n');
} catch (IOException e) {
throw new HyracksDataException(e);
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index dfae011..25915aa 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -8,7 +8,6 @@
import java.io.IOException;
import junit.framework.Assert;
-
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -24,16 +23,16 @@
import org.junit.Before;
import org.junit.Test;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver;
import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.test.TestUtils;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+//import edu.uci.ics.genomix.oldtype.NodeWritable;
@SuppressWarnings("deprecation")
public class JobRun {
private static final int KmerSize = 5;
- private static final int ReadLength = 8;
+ private static final int ReadLength = 6;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
@@ -41,16 +40,7 @@
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
- private static final String EXPECTED_DIR = "src/test/resources/expected/";
- private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
-// private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
-// private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
-// private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
-// private static final String EXPECTED_OUPUT_NODE = EXPECTED_DIR + "result_after_generateNode";
-// private static final String EXPECTED_UNMERGED = EXPECTED_DIR + "result_unmerged";
-
private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
- private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";;
private MiniDFSCluster dfsCluster;
@@ -63,13 +53,22 @@
@Test
public void TestAll() throws Exception {
TestReader();
+// TestGroupby();
}
public void TestReader() throws Exception {
cleanUpReEntry();
conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
- Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, null));
+ dumpResult();
+ }
+
+ public void TestGroupby() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+// dumpResult();
}
@Before
@@ -129,54 +128,12 @@
}
}
- private boolean checkResults(String expectedPath, int[] poslistField) throws Exception {
- File dumped = null;
+ private void dumpResult() throws Exception {
String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
- dumped = new File(DUMPED_RESULT);
- } else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
- File filePathTo = new File(CONVERT_RESULT);
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
- for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
- String partname = "/part-" + i;
- // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- // + partname), FileSystem.getLocal(new Configuration()),
- // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
- // false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
- FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
- continue;
- }
- SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-
- NodeWritable node = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
- NullWritable value = NullWritable.get();
- while (reader.next(node, value)) {
- if (node == null) {
- break;
- }
- bw.write(node.toString());
- System.out.println(node.toString());
- bw.newLine();
- }
- reader.close();
- }
- bw.close();
- dumped = new File(CONVERT_RESULT);
- }
-
- if (poslistField != null) {
- TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
- } else {
- TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
- }
- return true;
+ }
}
@After
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 4ce59a0..51a0d15 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -41,7 +41,7 @@
import edu.uci.ics.genomix.hyracks.driver.Driver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
@@ -76,10 +76,10 @@
@Test
public void TestAll() throws Exception {
// TestReader();
- TestGroupbyKmer();
+// TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
-// TestEndToEnd();
+ TestEndToEnd();
// TestUnMergedNode();
}
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
index 17770fa..3f1cd5c 100644
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/test1.txt
@@ -1 +1 @@
-1 AATAGAAG
+1 AATAGA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
index 6695abe..4dfff11 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
@@ -11,6 +11,7 @@
import edu.uci.ics.pregelix.api.io.VertexReader;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
@@ -76,7 +77,8 @@
vertexValue.setFRList(node.getFRList());
vertexValue.setRFList(node.getRFList());
vertexValue.setRRList(node.getRRList());
- vertexValue.setKmer(getRecordReader().getCurrentKey());
+ // TODO make this more efficient (don't use toString)
+ vertexValue.setKmer(new VKmerBytesWritable(getRecordReader().getCurrentKey().toString()));
vertexValue.setState(State.IS_NON);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
new file mode 100644
index 0000000..0308913
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class P2PathMergeOutputFormat extends
+ BinaryDataCleanVertexOutputFormat<KmerBytesWritable, VertexValueWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ byte selfFlag = (byte)(vertex.getVertexValue().getState() & State.VERTEX_MASK);
+ if(selfFlag == State.IS_FINAL)
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index 29872fa..b19a0cf 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -19,8 +19,8 @@
}
public AdjacencyListWritable(int kmerSize){
- forwardList = new KmerListWritable(kmerSize);
- reverseList = new KmerListWritable(kmerSize);
+ forwardList = new KmerListWritable();
+ reverseList = new KmerListWritable();
}
public void set(AdjacencyListWritable adjacencyList){
@@ -34,8 +34,8 @@
}
public void reset(int kmerSize){
- forwardList.reset(kmerSize);
- reverseList.reset(kmerSize);
+ forwardList.reset();
+ reverseList.reset();
}
public int getCountOfPosition(){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index c3e02a0..9fd15dd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -27,7 +27,7 @@
public MergeBubbleMessageWritable() {
sourceVertexId = new PositionWritable();
- chainVertexId = new KmerBytesWritable(0);
+ chainVertexId = new KmerBytesWritable();
neighberNode = new AdjacencyListWritable();
startVertexId = new PositionWritable();
message = Message.NON;
@@ -78,7 +78,7 @@
public void reset() {
checkMessage = 0;
- chainVertexId.reset(1);
+// chainVertexId.reset();
neighberNode.reset();
message = Message.NON;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index ea91144..d9177e2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -22,12 +22,13 @@
private byte flag;
private boolean isFlip;
private int kmerlength = 0;
+ private boolean updateMsg = false;
private byte checkMessage;
public MessageWritable() {
sourceVertexId = new KmerBytesWritable();
- kmer = new KmerBytesWritable(0);
+ kmer = new KmerBytesWritable();
neighberNode = new AdjacencyListWritable();
flag = Message.NON;
isFlip = false;
@@ -37,7 +38,7 @@
public MessageWritable(int kmerSize) {
kmerlength = kmerSize;
sourceVertexId = new KmerBytesWritable();
- kmer = new KmerBytesWritable(0);
+ kmer = new KmerBytesWritable();
neighberNode = new AdjacencyListWritable(kmerSize);
flag = Message.NON;
isFlip = false;
@@ -61,6 +62,7 @@
}
checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
+ updateMsg = msg.isUpdateMsg();
}
public void set(int kmerlength, KmerBytesWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
@@ -82,17 +84,16 @@
}
public void reset() {
- checkMessage = 0;
- kmer.reset(1);
- neighberNode.reset();
- flag = Message.NON;
+ reset(0);
}
public void reset(int kmerSize) {
- checkMessage = 0;
- kmer.reset(1);
+ checkMessage = (byte) 0;
+ kmerlength = kmerSize;
+// kmer.reset();
neighberNode.reset(kmerSize);
flag = Message.NON;
+ isFlip = false;
}
public KmerBytesWritable getSourceVertexId() {
@@ -148,6 +149,15 @@
this.isFlip = isFlip;
}
+
+ public boolean isUpdateMsg() {
+ return updateMsg;
+ }
+
+ public void setUpdateMsg(boolean updateMsg) {
+ this.updateMsg = updateMsg;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(kmerlength);
@@ -160,6 +170,7 @@
neighberNode.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
+ out.writeBoolean(updateMsg);
}
@Override
@@ -175,6 +186,7 @@
neighberNode.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
+ updateMsg = in.readBoolean();
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index d472fd1..60ad003 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -7,6 +7,7 @@
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.KmerListWritable;
public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
@@ -32,15 +33,15 @@
public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
- public static final byte SHOULD_MERGE_CLEAR = 0b1110011;
+ public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
}
private PositionListWritable nodeIdList;
private AdjacencyListWritable incomingList;
private AdjacencyListWritable outgoingList;
private byte state;
- private KmerBytesWritable kmer;
- private KmerBytesWritable mergeDest;
+ private VKmerBytesWritable kmer;
+ private VKmerBytesWritable mergeDest;
private int kmerlength = 0;
public VertexValueWritable() {
@@ -53,13 +54,13 @@
incomingList = new AdjacencyListWritable();
outgoingList = new AdjacencyListWritable();
state = State.IS_NON;
- kmer = new KmerBytesWritable(kmerSize);
- mergeDest = new KmerBytesWritable(kmerSize);
+ kmer = new VKmerBytesWritable();
+ mergeDest = new VKmerBytesWritable();
}
public VertexValueWritable(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
KmerListWritable reverseForwardList, KmerListWritable reverseReverseList,
- byte state, KmerBytesWritable kmer) {
+ byte state, VKmerBytesWritable kmer) {
set(nodeIdList, forwardForwardList, forwardReverseList,
reverseForwardList, reverseReverseList,
state, kmer);
@@ -67,8 +68,8 @@
public void set(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
KmerListWritable reverseForwardList, KmerListWritable reverseReverseList,
- byte state, KmerBytesWritable kmer) {
- this.kmerlength = kmer.kmerByteSize;
+ byte state, VKmerBytesWritable kmer) {
+ this.kmerlength = kmer.getKmerLetterLength();
this.incomingList.setForwardList(reverseForwardList);
this.incomingList.setReverseList(reverseReverseList);
this.outgoingList.setForwardList(forwardForwardList);
@@ -149,22 +150,22 @@
}
public int getLengthOfKmer() {
- return kmer.getKmerLength();
+ return kmer.getKmerLetterLength();
}
- public KmerBytesWritable getKmer() {
+ public VKmerBytesWritable getKmer() {
return kmer;
}
- public void setKmer(KmerBytesWritable kmer) {
+ public void setKmer(VKmerBytesWritable kmer) {
this.kmer.setAsCopy(kmer);
}
- public KmerBytesWritable getMergeDest() {
+ public VKmerBytesWritable getMergeDest() {
return mergeDest;
}
- public void setMergeDest(KmerBytesWritable mergeDest) {
+ public void setMergeDest(VKmerBytesWritable mergeDest) {
this.mergeDest = mergeDest;
}
@@ -180,11 +181,11 @@
public void reset(int kmerSize) {
this.kmerlength = kmerSize;
this.nodeIdList.reset();
- this.incomingList.getForwardList().reset(kmerSize);
- this.incomingList.getReverseList().reset(kmerSize);
- this.outgoingList.getForwardList().reset(kmerSize);
- this.outgoingList.getReverseList().reset(kmerSize);
- this.kmer.reset(0);
+ this.incomingList.getForwardList().reset();
+ this.incomingList.getReverseList().reset();
+ this.outgoingList.getForwardList().reset();
+ this.outgoingList.getReverseList().reset();
+// this.kmer.reset(0);
}
@Override
@@ -237,6 +238,26 @@
}
/*
+ * Delete the corresponding edge
+ */
+ public void processDelete(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete){
+ switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().remove(nodeToDelete);
+ break;
+ }
+ }
+
+ /*
* Process any changes to value. This is for edge updates
*/
public void processUpdates(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
index bb60a25..95e070f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/DataLoadLogFormatter.java
@@ -4,7 +4,7 @@
import java.util.logging.Handler;
import java.util.logging.LogRecord;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
public class DataLoadLogFormatter extends Formatter {
private NodeWritable key;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 27e2683..7d6a1b9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -12,11 +12,11 @@
// Create a DateFormat to format the logger timestamp.
//
private long step;
- private KmerBytesWritable sourceVertexId = new KmerBytesWritable(1);
- private KmerBytesWritable destVertexId = new KmerBytesWritable(1);
+ private KmerBytesWritable sourceVertexId = new KmerBytesWritable();
+ private KmerBytesWritable destVertexId = new KmerBytesWritable();
private MessageWritable msg = new MessageWritable();
private byte state;
- private KmerBytesWritable mergeChain = new KmerBytesWritable(1);;
+ private KmerBytesWritable mergeChain = new KmerBytesWritable();
//private boolean testDelete = false;
/**
* 0: general operation
@@ -55,11 +55,11 @@
}
public void reset() {
- this.sourceVertexId = new KmerBytesWritable(1);
- this.destVertexId = new KmerBytesWritable(1);
+ this.sourceVertexId = new KmerBytesWritable();
+ this.destVertexId = new KmerBytesWritable();
this.msg = new MessageWritable();
this.state = 0;
- this.mergeChain = new KmerBytesWritable(1);
+ this.mergeChain = new KmerBytesWritable();
}
public String format(LogRecord record) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 9acba97..1beee03 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -13,6 +13,7 @@
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
/**
@@ -25,8 +26,8 @@
public static int kmerSize = -1;
protected int maxIteration = -1;
- protected MessageWritable incomingMsg = null; // = new MessageWritable();
- protected MessageWritable outgoingMsg = null; // = new MessageWritable();
+ protected MessageWritable incomingMsg = null;
+ protected MessageWritable outgoingMsg = null;
protected KmerBytesWritable destVertexId = new KmerBytesWritable();
protected Iterator<KmerBytesWritable> posIterator;
private KmerBytesWritable kmer = new KmerBytesWritable();
@@ -96,10 +97,12 @@
public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FF;
return posIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
posIterator = value.getFRList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FR;
return posIterator.next();
} else {
@@ -111,10 +114,12 @@
public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RF;
return posIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
posIterator = value.getRRList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RR;
return posIterator.next();
} else {
@@ -290,6 +295,7 @@
* set adjMessage to successor(from predecessor)
*/
public void setSuccessorAdjMsg(){
+ outFlag &= MessageFlag.DIR_CLEAR;
if(getVertexValue().getFFList().getLength() > 0)
outFlag |= MessageFlag.DIR_FF;
else if(getVertexValue().getFRList().getLength() > 0)
@@ -302,6 +308,7 @@
* set adjMessage to predecessor(from successor)
*/
public void setPredecessorAdjMsg(){
+ outFlag &= MessageFlag.DIR_CLEAR;
if(getVertexValue().getRFList().getLength() > 0)
outFlag |= MessageFlag.DIR_RF;
else if(getVertexValue().getRRList().getLength() > 0)
@@ -342,18 +349,38 @@
}
/**
+ * send update message to neighber for P2
+ * @throws IOException
+ */
+ public void sendUpdateMsg(){
+ outgoingMsg.setUpdateMsg(true);
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ switch(meToNeighborDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ sendUpdateMsgToPredecessor();
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ sendUpdateMsgToSuccessor();
+ break;
+ }
+ }
+
+ /**
* send merge message to neighber for P2
* @throws IOException
*/
public void sendMergeMsg(){
- if(selfFlag == MessageFlag.IS_HEAD){
+ outgoingMsg.setUpdateMsg(false);
+ if(selfFlag == State.IS_HEAD){
byte newState = getVertexValue().getState();
newState &= ~State.IS_HEAD;
newState |= State.IS_OLDHEAD;
getVertexValue().setState(newState);
resetSelfFlag();
- outFlag |= MessageFlag.IS_HEAD;
- } else if(selfFlag == MessageFlag.IS_OLDHEAD){
+ outFlag |= MessageFlag.IS_HEAD;
+ } else if(selfFlag == State.IS_OLDHEAD){
outFlag |= MessageFlag.IS_OLDHEAD;
voteToHalt();
}
@@ -370,7 +397,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
@@ -383,7 +410,47 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
+ break;
+ }
+// if(headBecomeOldHead)
+// getVertexValue().processDelete(neighborToMeDir, incomingMsg.getSourceVertexId());
+ }
+
+ /**
+ * send final merge message to neighber for P2
+ * @throws IOException
+ */
+ public void sendFinalMergeMsg(){
+ outFlag |= MessageFlag.IS_FINAL;
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
@@ -406,7 +473,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
@@ -419,7 +486,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer().asFixedLengthKmer());
sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
@@ -428,6 +495,7 @@
public void setStateAsMergeWithNext(){
byte state = getVertexValue().getState();
+ state &= State.SHOULD_MERGE_CLEAR;
state |= State.SHOULD_MERGEWITHNEXT;
getVertexValue().setState(state);
}
@@ -445,6 +513,7 @@
public void setStateAsMergeWithPrev(){
byte state = getVertexValue().getState();
+ state &= State.SHOULD_MERGE_CLEAR;
state |= State.SHOULD_MERGEWITHPREV;
getVertexValue().setState(state);
}
@@ -551,7 +620,7 @@
byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
@@ -559,6 +628,60 @@
}
/**
+ * final merge and updateAdjList having parameter for p2
+ */
+ public void processFinalMerge(MessageWritable msg){
+ byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
+
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = msg.getKmer().toString();
+ index = msgString.indexOf(match);
+// kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_FR:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ index = msgString.indexOf(match);
+// kmer.reset(msgString.length() - index);
+ kmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_RF:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+// kmer.reset(index + 1);
+ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_RR:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = msg.getKmer().toString();
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+// kmer.reset(index + 1);
+ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+ break;
+ }
+
+ getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
+ kmerSize, kmer);
+ }
+
+ /**
* set head state
*/
public void setHeadState(){
@@ -579,12 +702,12 @@
}
/**
- * set final state
+ * set stop flag
*/
public void setStopFlag(){
- byte state = incomingMsg.getFlag();
+ byte state = getVertexValue().getState();
state &= State.VERTEX_CLEAR;
- state |= State.IS_STOP;
+ state |= State.IS_FINAL;
getVertexValue().setState(state);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index feb22aa..b6f2164 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -4,14 +4,15 @@
import java.util.Iterator;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
* vertexValue: VertexValueWritable
@@ -44,7 +45,7 @@
BasicPathMergeVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
- PositionWritable tempPostition = new PositionWritable();
+ KmerBytesWritable tmpKmer = new KmerBytesWritable();
/**
* initiate kmerSize, maxIteration
@@ -54,9 +55,14 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
- selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
- outgoingMsg.reset();
+ headFlag = (byte)(getVertexValue().getState() & State.IS_HEAD);
+ selfFlag = (byte)(getVertexValue().getState() & State.VERTEX_MASK);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
receivedMsgList.clear();
}
@@ -65,18 +71,18 @@
*/
public void sendOutMsg() {
//send wantToMerge to next
- tempPostition = getNextDestVertexIdAndSetFlag(getVertexValue());
- if(tempPostition != null){
- destVertexId.setAsCopy(tempPostition);
+ tmpKmer = getNextDestVertexIdAndSetFlag(getVertexValue());
+ if(tmpKmer != null){
+ destVertexId.setAsCopy(tmpKmer);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, outgoingMsg);
}
- ////send wantToMerge to prev
- tempPostition = getPreDestVertexIdAndSetFlag(getVertexValue());
- if(tempPostition != null){
- destVertexId.setAsCopy(tempPostition);
+ //send wantToMerge to prev
+ tmpKmer = getPreDestVertexIdAndSetFlag(getVertexValue());
+ if(tmpKmer != null){
+ destVertexId.setAsCopy(tmpKmer);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, outgoingMsg);
@@ -117,7 +123,7 @@
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
//send out wantToMerge msg
- if(selfFlag != MessageFlag.IS_HEAD){
+ if(selfFlag != State.IS_HEAD && selfFlag != State.IS_OLDHEAD){
sendOutMsg();
}
}
@@ -126,17 +132,19 @@
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
- if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
- getVertexValue().setState(MessageFlag.IS_STOP);
+ if(!msgIterator.hasNext() && selfFlag == State.IS_HEAD){
+ outFlag |= MessageFlag.IS_FINAL;
sendOutMsg();
}
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(getMsgFlag() == MessageFlag.IS_FINAL){
processMerge(incomingMsg);
- getVertexValue().setState(MessageFlag.IS_FINAL);
- }else
+ getVertexValue().setState(State.IS_FINAL);
+ }else{
+ sendUpdateMsg();
sendMergeMsg();
+ }
}
}
@@ -148,11 +156,13 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(getMsgFlag() == MessageFlag.IS_FINAL){
- setStopFlag();
- sendMergeMsg();
+ sendFinalMergeMsg();
break;
}
- receivedMsgList.add(incomingMsg);
+ if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
+ processUpdate();
+ else if(!incomingMsg.isUpdateMsg())
+ receivedMsgList.add(incomingMsg);
}
if(receivedMsgList.size() != 0){
byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
@@ -160,22 +170,22 @@
case MessageFromHead.BothMsgsFromHead:
case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_FINAL);
+ processFinalMerge(receivedMsgList.get(i)); //processMerge()
+ getVertexValue().setState(State.IS_FINAL);
voteToHalt();
break;
case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_HEAD);
+ processFinalMerge(receivedMsgList.get(i));
+ getVertexValue().setState(State .IS_HEAD);
break;
case MessageFromHead.BothMsgsFromNonHead:
for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
+ processFinalMerge(receivedMsgList.get(i));
break;
case MessageFromHead.NO_MSG:
//halt
- deleteVertex(getVertexId());
+ voteToHalt(); //deleteVertex(getVertexId());
break;
}
}
@@ -188,12 +198,21 @@
else if (getSuperstep() == 2)
initState(msgIterator);
else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
- sendMsgToPathVertex(msgIterator);
- if(selfFlag != MessageFlag.IS_HEAD)
- voteToHalt();
+ if(msgIterator.hasNext()){ //for processing final merge
+ incomingMsg = msgIterator.next();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ setFinalState();
+ processFinalMerge(incomingMsg);
+ }
+ }
+ else{
+ sendMsgToPathVertex(msgIterator);
+ if(selfFlag != State.IS_HEAD)
+ voteToHalt();
+ }
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- if(selfFlag != MessageFlag.IS_HEAD)
+ if(selfFlag != State.IS_HEAD)
voteToHalt();
} else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
processMergeInHeadVertex(msgIterator);
@@ -207,9 +226,9 @@
/**
* BinaryInput and BinaryOutput~/
*/
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index ac43c62..53e46af 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -60,7 +60,7 @@
private MessageWritable outgoingMsg = new MessageWritable();
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
+ private KmerBytesWritable lastKmer = new KmerBytesWritable();
private PositionWritable destVertexId = new PositionWritable();
private Iterator<PositionWritable> posIterator;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index e6c5f34..7a22d25 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -78,8 +78,8 @@
incomingMsg = new MessageWritable(kmerSize);
if(outgoingMsg == null)
outgoingMsg = new MessageWritable(kmerSize);
- //if (randSeed < 0)
- // randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ else
+ outgoingMsg.reset(kmerSize);
randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index 6b9eb4e..d3180c8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -28,7 +28,8 @@
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, KmerBytesWritable.class,
NullWritable.class, CompressionType.NONE);
- KmerBytesWritable outKey = new KmerBytesWritable(55);
+ KmerBytesWritable.setGlobalKmerLength(55);
+ KmerBytesWritable outKey = new KmerBytesWritable();
int i = 0;
for (i = 0; i < numOfLines; i++) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index bc08600..8618237 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -19,6 +19,7 @@
public class GenerateTextFile {
public static void generateFromPathmergeResult(int kmerSize, String strSrcDir, String outPutDir) throws IOException {
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.getLocal(conf);
@@ -44,13 +45,14 @@
}
public static void generateSpecificLengthChainFromNaivePathmergeResult(int maxLength) throws IOException {
+ KmerBytesWritable.setGlobalKmerLength(55);
BufferedWriter bw = new BufferedWriter(new FileWriter("naive_text_" + maxLength));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
for (int i = 0; i < 2; i++) {
Path path = new Path("/home/anbangx/genomix_result/final_naive/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(55);
+ KmerBytesWritable key = new KmerBytesWritable();
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
@@ -68,13 +70,14 @@
}
public static void generateSpecificLengthChainFromLogPathmergeResult(int maxLength) throws IOException {
+ KmerBytesWritable.setGlobalKmerLength(55);
BufferedWriter bw = new BufferedWriter(new FileWriter("log_text_" + maxLength));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
for (int i = 0; i < 2; i++) {
Path path = new Path("/home/anbangx/genomix_result/improvelog2/part-" + i);
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(55);
+ KmerBytesWritable key = new KmerBytesWritable();
VertexValueWritable value = new VertexValueWritable();
while (reader.next(key, value)) {
@@ -93,12 +96,13 @@
}
public static void generateFromGraphbuildResult() throws IOException {
+ KmerBytesWritable.setGlobalKmerLength(55);
BufferedWriter bw = new BufferedWriter(new FileWriter("textfile"));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
Path path = new Path("data/input/part-0-out-3000000");
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
- KmerBytesWritable key = new KmerBytesWritable(55);
+ KmerBytesWritable key = new KmerBytesWritable();
while (reader.next(key, null)) {
if (key == null) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 6e1bc4b..bee1dd8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -9,4 +9,5 @@
public static final byte DIR_RF = 0b011 << 0;
public static final byte DIR_RR = 0b100 << 0;
public static final byte DIR_MASK = 0b111 << 0;
+ public static final byte DIR_CLEAR = 0b1111000 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 0e798fe..e18c31b 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -7,6 +7,7 @@
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -42,21 +43,21 @@
// + "NaiveAlgorithmForMergeGraph.xml");
// }
-// private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
-// job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genLogAlgorithmForMergeGraph() throws IOException {
-// generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
-// }
+ private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genLogAlgorithmForMergeGraph() throws IOException {
+ generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+ }
//
// private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -77,22 +78,22 @@
// + "P3ForMergeGraph.xml");
// }
- private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(P4ForPathMergeVertex.class);
- job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
- job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genP4ForMergeGraph() throws IOException {
- generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
- + "P4ForMergeGraph.xml");
- }
+// private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(P4ForPathMergeVertex.class);
+// job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(KmerBytesWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genP4ForMergeGraph() throws IOException {
+// generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+// + "P4ForMergeGraph.xml");
+// }
// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -198,7 +199,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
// genTipRemoveGraph();
@@ -206,7 +207,7 @@
// genBridgeRemoveGraph();
// genBubbleAddGraph();
// genBubbleMergeGraph();
- genP4ForMergeGraph();
+// genP4ForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 5aedeb7..1578dfc 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -52,7 +52,7 @@
// + "6", PreFix + File.separator
// + "7", PreFix + File.separator
// + "8", PreFix + File.separator
- + "4"};
+ + "9"};
private static final String ACTUAL_RESULT_DIR = "data/actual/pathmerge";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
deleted file mode 100644
index 597e5c3..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>P4ForMergeGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.KmerBytesWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>BasicPathMergeVertex.kmerSize</name><value>3</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
index 3d007d2..5a15ca0 100644
--- a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
+++ b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
@@ -1 +1 @@
-P4ForMergeGraph.xml
+LogAlgorithmForMergeGraph.xml