refactor graph building in hyracks using new genomix-data code
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java
new file mode 100644
index 0000000..c3d8a98
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/GeneCode.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+public class GeneCode {
+ public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
+ /**
+ * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL
+ * }
+ */
+ public static final byte A = 0;
+ public static final byte C = 1;
+ public static final byte G = 2;
+ public static final byte T = 3;
+
+ public static byte getCodeFromSymbol(byte ch) {
+ byte r = 0;
+ switch (ch) {
+ case 'A':
+ case 'a':
+ r = A;
+ break;
+ case 'C':
+ case 'c':
+ r = C;
+ break;
+ case 'G':
+ case 'g':
+ r = G;
+ break;
+ case 'T':
+ case 't':
+ r = T;
+ break;
+ }
+ return r;
+ }
+
+ public static byte getPairedGeneCode(byte genecode){
+ if ( genecode < 0 || genecode > 3){
+ throw new IllegalArgumentException("Invalid genecode");
+ }
+ return (byte) (3- genecode);
+ }
+
+ public static byte getPairedCodeFromSymbol(byte ch){
+ return getPairedGeneCode(getCodeFromSymbol(ch));
+ }
+
+ public static byte getSymbolFromCode(byte code) {
+ if (code > 3 || code < 0 ) {
+ throw new IllegalArgumentException("Invalid genecode");
+ }
+ return GENE_SYMBOL[code];
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java
new file mode 100644
index 0000000..630dbad
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritable.java
@@ -0,0 +1,502 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.KmerUtil;
+
+/**
+ * Variable kmer length byteswritable
+ * It was used to generate the graph in which phase the kmer length doesn't change.
+ * Thus the size of bytes doesn't change either.
+ */
+public class KmerBytesWritable extends BinaryComparable implements Serializable, WritableComparable<BinaryComparable> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final byte[] EMPTY_BYTES = {};
+
+ protected int size;
+ protected byte[] bytes;
+ protected int offset;
+ protected int kmerlength;
+
+ public KmerBytesWritable() {
+ this(0, EMPTY_BYTES, 0);
+ }
+
+ public KmerBytesWritable(int k, byte[] storage, int offset) {
+ setNewReference(k, storage, offset);
+ }
+
+ public KmerBytesWritable(int k, String kmer) {
+ setNewReference(kmer.length(), kmer.getBytes(), 0);
+ }
+
+ /**
+ * Initial Kmer space by kmerlength
+ *
+ * @param k
+ * kmerlength
+ */
+ public KmerBytesWritable(int k) {
+ this.kmerlength = k;
+ this.size = KmerUtil.getByteNumFromK(kmerlength);
+ if (k > 0) {
+ this.bytes = new byte[this.size];
+ } else {
+ this.bytes = EMPTY_BYTES;
+ }
+ this.offset = 0;
+ }
+
+ public KmerBytesWritable(KmerBytesWritable right) {
+ this(right.kmerlength);
+ set(right);
+ }
+
+ /**
+ * Deep copy of the given kmer
+ *
+ * @param newData
+ */
+ public void set(KmerBytesWritable newData) {
+ if (newData == null) {
+ this.set(0, EMPTY_BYTES, 0);
+ } else {
+ this.set(newData.kmerlength, newData.bytes, newData.getOffset());
+ }
+ }
+
+ /**
+ * Deep copy of the given bytes data
+ * It will not change the kmerlength
+ *
+ * @param newData
+ * @param offset
+ */
+ public void set(byte[] newData, int offset) {
+ if (kmerlength > 0) {
+ System.arraycopy(newData, offset, bytes, this.offset, size);
+ }
+ }
+
+ /**
+ * Deep copy of the given data, and also set to new kmerlength
+ *
+ * @param k
+ * : new kmer length
+ * @param newData
+ * : data storage
+ * @param offset
+ * : start offset
+ */
+ public void set(int k, byte[] newData, int offset) {
+ reset(k);
+ if (k > 0) {
+ System.arraycopy(newData, offset, bytes, this.offset, size);
+ }
+ }
+
+ /**
+ * Reset array by kmerlength
+ *
+ * @param k
+ */
+ public void reset(int k) {
+ this.kmerlength = k;
+ setSize(KmerUtil.getByteNumFromK(k));
+ clearLeadBit();
+ }
+
+ /**
+ * Point this datablock to the given bytes array
+ * It works like the pointer to new datablock.
+ * kmerlength will not change
+ *
+ * @param newData
+ * @param offset
+ */
+ public void setNewReference(byte[] newData, int offset) {
+ this.bytes = newData;
+ this.offset = offset;
+ if (newData.length - offset < size) {
+ throw new IllegalArgumentException("Not given enough space");
+ }
+ }
+
+ /**
+ * Point this datablock to the given bytes array
+ * It works like the pointer to new datablock.
+ * It also set the new kmerlength
+ *
+ * @param k
+ * @param newData
+ * @param offset
+ */
+ public void setNewReference(int k, byte[] newData, int offset) {
+ this.kmerlength = k;
+ this.size = KmerUtil.getByteNumFromK(k);
+ setNewReference(newData, offset);
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ this.size = size;
+ }
+
+ protected int getCapacity() {
+ return bytes.length;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap != getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (new_cap < size) {
+ size = new_cap;
+ }
+ if (size != 0) {
+ System.arraycopy(bytes, offset, new_data, 0, size);
+ }
+ bytes = new_data;
+ offset = 0;
+ }
+ }
+
+ /**
+ * Get one genecode (A|G|C|T) from the given kmer index
+ * e.g. Get the 4th gene of the kmer ACGTA will return T
+ *
+ * @param pos
+ * @return
+ */
+ public byte getGeneCodeAtPosition(int pos) {
+ if (pos >= kmerlength) {
+ throw new IllegalArgumentException("gene position out of bound");
+ }
+ int posByte = pos / 4;
+ int shift = (pos % 4) << 1;
+ return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
+ }
+
+ public int getKmerLength() {
+ return this.kmerlength;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ @Override
+ public int getLength() {
+ return size;
+ }
+
+ /**
+ * Read Kmer from read text into bytes array e.g. AATAG will compress as
+ * [0x000G, 0xATAA]
+ *
+ * @param k
+ * @param array
+ * @param start
+ */
+ public void setByRead(byte[] array, int start) {
+ byte l = 0;
+ int bytecount = 0;
+ int bcount = this.size - 1;
+ for (int i = start; i < start + kmerlength && i < array.length; i++) {
+ byte code = GeneCode.getCodeFromSymbol(array[i]);
+ l |= (byte) (code << bytecount);
+ bytecount += 2;
+ if (bytecount == 8) {
+ bytes[offset + bcount--] = l;
+ l = 0;
+ bytecount = 0;
+ }
+ }
+ if (bcount >= 0) {
+ bytes[offset] = l;
+ }
+ }
+
+ public void setByRead(int k, byte[] array, int start) {
+ reset(k);
+ setByRead(array, start);
+ }
+
+ /**
+ * Compress Reversed read into bytes array
+ * e.g. AATAG will paired to CTATT, and then compress as
+ * [0x000T,0xTATC]
+ *
+ * @param input
+ * array
+ * @param start
+ * position
+ */
+ public void setByReadReverse(byte[] array, int start) {
+ byte l = 0;
+ int bytecount = 0;
+ int bcount = size - 1;
+ for (int i = start + kmerlength - 1; i >= 0 && i < array.length; i--) {
+ byte code = GeneCode.getPairedCodeFromSymbol(array[i]);
+ l |= (byte) (code << bytecount);
+ bytecount += 2;
+ if (bytecount == 8) {
+ bytes[offset + bcount--] = l;
+ l = 0;
+ bytecount = 0;
+ }
+ }
+ if (bcount >= 0) {
+ bytes[offset] = l;
+ }
+ }
+
+ public void setByReadReverse(int k, byte[] array, int start) {
+ reset(k);
+ setByReadReverse(array, start);
+ }
+
+ /**
+ * Shift Kmer to accept new char input
+ *
+ * @param c
+ * Input new gene character
+ * @return the shift out gene, in gene code format
+ */
+ public byte shiftKmerWithNextChar(byte c) {
+ return shiftKmerWithNextCode(GeneCode.getCodeFromSymbol(c));
+ }
+
+ /**
+ * Shift Kmer to accept new gene code
+ *
+ * @param c
+ * Input new gene code
+ * @return the shift out gene, in gene code format
+ */
+ public byte shiftKmerWithNextCode(byte c) {
+ byte output = (byte) (bytes[offset + size - 1] & 0x03);
+ for (int i = size - 1; i > 0; i--) {
+ byte in = (byte) (bytes[offset + i - 1] & 0x03);
+ bytes[offset + i] = (byte) (((bytes[offset + i] >>> 2) & 0x3f) | (in << 6));
+ }
+ int pos = ((kmerlength - 1) % 4) << 1;
+ byte code = (byte) (c << pos);
+ bytes[offset] = (byte) (((bytes[offset] >>> 2) & 0x3f) | code);
+ clearLeadBit();
+ return output;
+ }
+
+ /**
+ * Shift Kmer to accept new input char
+ *
+ * @param c
+ * Input new gene character
+ * @return the shiftout gene, in gene code format
+ */
+ public byte shiftKmerWithPreChar(byte c) {
+ return shiftKmerWithPreCode(GeneCode.getCodeFromSymbol(c));
+ }
+
+ /**
+ * Shift Kmer to accept new gene code
+ *
+ * @param c
+ * Input new gene code
+ * @return the shiftout gene, in gene code format
+ */
+ public byte shiftKmerWithPreCode(byte c) {
+ int pos = ((kmerlength - 1) % 4) << 1;
+ byte output = (byte) ((bytes[offset] >> pos) & 0x03);
+ for (int i = 0; i < size - 1; i++) {
+ byte in = (byte) ((bytes[offset + i + 1] >> 6) & 0x03);
+ bytes[offset + i] = (byte) ((bytes[offset + i] << 2) | in);
+ }
+ bytes[offset + size - 1] = (byte) ((bytes[offset + size - 1] << 2) | c);
+ clearLeadBit();
+ return output;
+ }
+
+ /**
+ * Merge Kmer with the next connected Kmer
+ * e.g. AAGCTAA merge with AACAACC, if the initial kmerSize = 3
+ * then it will return AAGCTAACAACC
+ *
+ * @param initialKmerSize
+ * : the initial kmerSize
+ * @param kmer
+ * : the next kmer
+ */
+ public void mergeNextKmer(int initialKmerSize, KmerBytesWritable kmer) {
+ int preKmerLength = kmerlength;
+ int preSize = size;
+ this.kmerlength += kmer.kmerlength - initialKmerSize + 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ for (int i = 1; i <= preSize; i++) {
+ bytes[offset + size - i] = bytes[offset + preSize - i];
+ }
+ for (int k = initialKmerSize - 1; k < kmer.getKmerLength(); k += 4) {
+ byte onebyte = getOneByteFromKmerAtPosition(k, kmer.getBytes(), kmer.getOffset(), kmer.getLength());
+ appendOneByteAtPosition(preKmerLength + k - initialKmerSize + 1, onebyte, bytes, offset, size);
+ }
+ clearLeadBit();
+ }
+
+ /**
+ * Merge Kmer with the previous connected Kmer
+ * e.g. AACAACC merge with AAGCTAA, if the initial kmerSize = 3
+ * then it will return AAGCTAACAACC
+ *
+ * @param initialKmerSize
+ * : the initial kmerSize
+ * @param preKmer
+ * : the previous kmer
+ */
+ public void mergePreKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+ int preKmerLength = kmerlength;
+ int preSize = size;
+ this.kmerlength += preKmer.kmerlength - initialKmerSize + 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ byte cacheByte = getOneByteFromKmerAtPosition(0, bytes, offset, preSize);
+
+ // copy prekmer
+ for (int k = 0; k < preKmer.kmerlength - initialKmerSize + 1; k += 4) {
+ byte onebyte = getOneByteFromKmerAtPosition(k, preKmer.bytes, preKmer.offset, preKmer.size);
+ appendOneByteAtPosition(k, onebyte, bytes, offset, size);
+ }
+
+ // copy current kmer
+ int k = 4;
+ for (; k < preKmerLength; k += 4) {
+ byte onebyte = getOneByteFromKmerAtPosition(k, bytes, offset, preSize);
+ appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
+ cacheByte = onebyte;
+ }
+ appendOneByteAtPosition(preKmer.kmerlength - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, size);
+ clearLeadBit();
+ }
+
+ public static void appendOneByteAtPosition(int k, byte onebyte, byte[] buffer, int start, int length) {
+ int position = start + length - 1 - k / 4;
+ if (position < start) {
+ throw new IllegalArgumentException("Buffer for kmer storage is invalid");
+ }
+ int shift = ((k) % 4) << 1;
+ int mask = shift == 0 ? 0 : ((1 << shift) - 1);
+
+ buffer[position] = (byte) ((buffer[position] & mask) | ((0xff & onebyte) << shift));
+ if (position > start && shift != 0) {
+ buffer[position - 1] = (byte) ((buffer[position - 1] & (0xff - mask)) | ((byte) ((0xff & onebyte) >> (8 - shift))));
+ }
+ }
+
+ public static byte getOneByteFromKmerAtPosition(int k, byte[] buffer, int start, int length) {
+ int position = start + length - 1 - k / 4;
+ if (position < start) {
+ throw new IllegalArgumentException("Buffer of kmer storage is invalid");
+ }
+ int shift = (k % 4) << 1;
+ byte data = (byte) (((0xff) & buffer[position]) >> shift);
+ if (shift != 0 && position > start) {
+ data |= 0xff & (buffer[position - 1] << (8 - shift));
+ }
+ return data;
+ }
+
+ protected void clearLeadBit() {
+ if (kmerlength % 4 != 0) {
+ bytes[offset] &= (1 << ((kmerlength % 4) << 1)) - 1;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.kmerlength = in.readInt();
+ this.size = KmerUtil.getByteNumFromK(kmerlength);
+ if (this.kmerlength > 0) {
+ if (this.bytes.length < this.size) {
+ this.bytes = new byte[this.size];
+ this.offset = 0;
+ }
+ in.readFully(bytes, offset, size);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(kmerlength);
+ if (kmerlength > 0) {
+ out.write(bytes, offset, size);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() * 31 + this.kmerlength;
+ }
+
+ @Override
+ public boolean equals(Object right_obj) {
+ if (right_obj instanceof KmerBytesWritable)
+ return this.kmerlength == ((KmerBytesWritable) right_obj).kmerlength && super.equals(right_obj);
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), offset, this.getLength());
+ }
+
+ public static class Comparator extends WritableComparator {
+ public final int LEAD_BYTES = 4;
+
+ public Comparator() {
+ super(KmerBytesWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int kmerlength1 = readInt(b1, s1);
+ int kmerlength2 = readInt(b2, s2);
+ if (kmerlength1 == kmerlength2) {
+ return compareBytes(b1, s1 + LEAD_BYTES, l1 - LEAD_BYTES, b2, s2 + LEAD_BYTES, l2 - LEAD_BYTES);
+ }
+ return kmerlength1 - kmerlength2;
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(KmerBytesWritable.class, new Comparator());
+ }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java
new file mode 100644
index 0000000..b0aaebc
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/KmerBytesWritableFactory.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+public class KmerBytesWritableFactory {
+ private KmerBytesWritable kmer;
+
+ public KmerBytesWritableFactory(int k) {
+ kmer = new KmerBytesWritable(k);
+ }
+
+ /**
+ * Read Kmer from read text into bytes array e.g. AATAG will compress as
+ * [0x000G, 0xATAA]
+ *
+ * @param k
+ * @param array
+ * @param start
+ */
+ public KmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
+ kmer.reset(k);
+ kmer.setByRead(array, start);
+ return kmer;
+ }
+
+ /**
+ * Compress Reversed Kmer into bytes array AATAG will compress as
+ * [0x000A,0xATAG]
+ *
+ * @param array
+ * @param start
+ */
+ public KmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
+ kmer.reset(k);
+ kmer.setByReadReverse(array, start);
+ return kmer;
+ }
+
+ /**
+ * Get last kmer from kmer-chain.
+ * e.g. kmerChain is AAGCTA, if k =5, it will
+ * return AGCTA
+ *
+ * @param k
+ * @param kInChain
+ * @param kmerChain
+ * @return LastKmer bytes array
+ */
+ public KmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
+ if (lastK > kmerChain.getKmerLength()) {
+ return null;
+ }
+ if (lastK == kmerChain.getKmerLength()) {
+ kmer.set(kmerChain);
+ return kmer;
+ }
+ kmer.reset(lastK);
+
+ /** from end to start */
+ int byteInChain = kmerChain.getLength() - 1 - (kmerChain.getKmerLength() - lastK) / 4;
+ int posInByteOfChain = ((kmerChain.getKmerLength() - lastK) % 4) << 1; // *2
+ int byteInKmer = kmer.getLength() - 1;
+ for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
+ kmer.getBytes()[byteInKmer] = (byte) ((0xff & kmerChain.getBytes()[byteInChain]) >> posInByteOfChain);
+ kmer.getBytes()[byteInKmer] |= ((kmerChain.getBytes()[byteInChain - 1] << (8 - posInByteOfChain)));
+ }
+
+ /** last kmer byte */
+ if (byteInKmer == 0) {
+ kmer.getBytes()[0] = (byte) ((kmerChain.getBytes()[0] & 0xff) >> posInByteOfChain);
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ /**
+ * Get first kmer from kmer-chain e.g. kmerChain is AAGCTA, if k=5, it will
+ * return AAGCT
+ *
+ * @param k
+ * @param kInChain
+ * @param kmerChain
+ * @return FirstKmer bytes array
+ */
+ public KmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
+ if (firstK > kmerChain.getKmerLength()) {
+ return null;
+ }
+ if (firstK == kmerChain.getKmerLength()) {
+ kmer.set(kmerChain);
+ return kmer;
+ }
+ kmer.reset(firstK);
+
+ int i = 1;
+ for (; i < kmer.getLength(); i++) {
+ kmer.getBytes()[kmer.getLength() - i] = kmerChain.getBytes()[kmerChain.getLength() - i];
+ }
+ int posInByteOfChain = (firstK % 4) << 1; // *2
+ if (posInByteOfChain == 0) {
+ kmer.getBytes()[0] = kmerChain.getBytes()[kmerChain.getLength() - i];
+ } else {
+ kmer.getBytes()[0] = (byte) (kmerChain.getBytes()[kmerChain.getLength() - i] & ((1 << posInByteOfChain) - 1));
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ public KmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
+ if (startK + kSize > kmerChain.getKmerLength()) {
+ return null;
+ }
+ if (startK == 0 && kSize == kmerChain.getKmerLength()) {
+ kmer.set(kmerChain);
+ return kmer;
+ }
+ kmer.reset(kSize);
+
+ /** from end to start */
+ int byteInChain = kmerChain.getLength() - 1 - startK / 4;
+ int posInByteOfChain = startK % 4 << 1; // *2
+ int byteInKmer = kmer.getLength() - 1;
+ for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
+ kmer.getBytes()[byteInKmer] = (byte) ((0xff & kmerChain.getBytes()[byteInChain]) >> posInByteOfChain);
+ kmer.getBytes()[byteInKmer] |= ((kmerChain.getBytes()[byteInChain - 1] << (8 - posInByteOfChain)));
+ }
+
+ /** last kmer byte */
+ if (byteInKmer == 0) {
+ kmer.getBytes()[0] = (byte) ((kmerChain.getBytes()[0] & 0xff) >> posInByteOfChain);
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ /**
+ * Merge kmer with next neighbor in gene-code format.
+ * The k of new kmer will increase by 1
+ * e.g. AAGCT merge with A => AAGCTA
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param nextCode
+ * : next neighbor in gene-code format
+ * @return the merged Kmer, this K of this Kmer is k+1
+ */
+ public KmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
+ this.kmer.reset(kmer.getKmerLength() + 1);
+ for (int i = 1; i <= kmer.getLength(); i++) {
+ this.kmer.getBytes()[this.kmer.getLength() - i] = kmer.getBytes()[kmer.getLength() - i];
+ }
+ if (this.kmer.getLength() > kmer.getLength()) {
+ this.kmer.getBytes()[0] = (byte) (nextCode & 0x3);
+ } else {
+ this.kmer.getBytes()[0] = (byte) (kmer.getBytes()[0] | ((nextCode & 0x3) << ((kmer.getKmerLength() % 4) << 1)));
+ }
+ this.kmer.clearLeadBit();
+ return this.kmer;
+ }
+
+ /**
+ * Merge kmer with previous neighbor in gene-code format.
+ * The k of new kmer will increase by 1
+ * e.g. AAGCT merge with A => AAAGCT
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param preCode
+ * : next neighbor in gene-code format
+ * @return the merged Kmer,this K of this Kmer is k+1
+ */
+ public KmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+ this.kmer.reset(kmer.getKmerLength() + 1);
+ int byteInMergedKmer = 0;
+ if (kmer.getKmerLength() % 4 == 0) {
+ this.kmer.getBytes()[0] = (byte) ((kmer.getBytes()[0] >> 6) & 0x3);
+ byteInMergedKmer++;
+ }
+ for (int i = 0; i < kmer.getLength() - 1; i++, byteInMergedKmer++) {
+ this.kmer.getBytes()[byteInMergedKmer] = (byte) ((kmer.getBytes()[i] << 2) | ((kmer.getBytes()[i + 1] >> 6) & 0x3));
+ }
+ this.kmer.getBytes()[byteInMergedKmer] = (byte) ((kmer.getBytes()[kmer.getLength() - 1] << 2) | (preCode & 0x3));
+ this.kmer.clearLeadBit();
+ return this.kmer;
+ }
+
+ /**
+ * Merge two kmer to one kmer
+ * e.g. ACTA + ACCGT => ACTAACCGT
+ *
+ * @param preK
+ * : previous k of kmer
+ * @param kmerPre
+ * : bytes array of previous kmer
+ * @param nextK
+ * : next k of kmer
+ * @param kmerNext
+ * : bytes array of next kmer
+ * @return merged kmer, the new k is @preK + @nextK
+ */
+ public KmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
+ kmer.reset(preKmer.getKmerLength() + nextKmer.getKmerLength());
+ int i = 1;
+ for (; i <= preKmer.getLength(); i++) {
+ kmer.getBytes()[kmer.getLength() - i] = preKmer.getBytes()[preKmer.getLength() - i];
+ }
+ if (i > 1) {
+ i--;
+ }
+ if (preKmer.getKmerLength() % 4 == 0) {
+ for (int j = 1; j <= nextKmer.getLength(); j++) {
+ kmer.getBytes()[kmer.getLength() - i - j] = nextKmer.getBytes()[nextKmer.getLength() - j];
+ }
+ } else {
+ int posNeedToMove = ((preKmer.getKmerLength() % 4) << 1);
+ kmer.getBytes()[kmer.getLength() - i] |= nextKmer.getBytes()[nextKmer.getLength() - 1] << posNeedToMove;
+ for (int j = 1; j < nextKmer.getLength(); j++) {
+ kmer.getBytes()[kmer.getLength() - i - j] = (byte) (((nextKmer.getBytes()[nextKmer.getLength() - j] & 0xff) >> (8 - posNeedToMove)) | (nextKmer
+ .getBytes()[nextKmer.getLength() - j - 1] << posNeedToMove));
+ }
+ if (nextKmer.getKmerLength() % 4 == 0 || (nextKmer.getKmerLength() % 4) * 2 + posNeedToMove > 8) {
+ kmer.getBytes()[0] = (byte) ((0xff & nextKmer.getBytes()[0]) >> (8 - posNeedToMove));
+ }
+ }
+ kmer.clearLeadBit();
+ return kmer;
+ }
+
+ /**
+ * Safely shifted the kmer forward without change the input kmer
+ * e.g. AGCGC shift with T => GCGCT
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param afterCode
+ * : input genecode
+ * @return new created kmer that shifted by afterCode, the K will not change
+ */
+ public KmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
+ this.kmer.set(kmer);
+ this.kmer.shiftKmerWithNextCode(afterCode);
+ return this.kmer;
+ }
+
+ /**
+ * Safely shifted the kmer backward without change the input kmer
+ * e.g. AGCGC shift with T => TAGCG
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param preCode
+ * : input genecode
+ * @return new created kmer that shifted by preCode, the K will not change
+ */
+ public KmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+ this.kmer.set(kmer);
+ this.kmer.shiftKmerWithPreCode(preCode);
+ return this.kmer;
+ }
+
+ /**
+ * get the reverse sequence of given kmer
+ *
+ * @param kmer
+ */
+ public KmerBytesWritable reverse(final KmerBytesWritable kmer) {
+ this.kmer.reset(kmer.getKmerLength());
+
+ int curPosAtKmer = ((kmer.getKmerLength() - 1) % 4) << 1;
+ int curByteAtKmer = 0;
+
+ int curPosAtReverse = 0;
+ int curByteAtReverse = this.kmer.getLength() - 1;
+ this.kmer.getBytes()[curByteAtReverse] = 0;
+ for (int i = 0; i < kmer.getKmerLength(); i++) {
+ byte gene = (byte) ((kmer.getBytes()[curByteAtKmer] >> curPosAtKmer) & 0x03);
+ this.kmer.getBytes()[curByteAtReverse] |= gene << curPosAtReverse;
+ curPosAtReverse += 2;
+ if (curPosAtReverse >= 8) {
+ curPosAtReverse = 0;
+ this.kmer.getBytes()[--curByteAtReverse] = 0;
+ }
+ curPosAtKmer -= 2;
+ if (curPosAtKmer < 0) {
+ curPosAtKmer = 6;
+ curByteAtKmer++;
+ }
+ }
+ this.kmer.clearLeadBit();
+ return this.kmer;
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
new file mode 100644
index 0000000..128bf9f
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/NodeWritable.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class NodeWritable implements WritableComparable<NodeWritable>, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private PositionWritable nodeID;
+ private PositionListWritable forwardForwardList;
+ private PositionListWritable forwardReverseList;
+ private PositionListWritable reverseForwardList;
+ private PositionListWritable reverseReverseList;
+ private KmerBytesWritable kmer;
+
+ public NodeWritable() {
+ this(21);
+ }
+
+ public NodeWritable(int kmerSize) {
+ nodeID = new PositionWritable(0, (byte) 0);
+ forwardForwardList = new PositionListWritable();
+ forwardReverseList = new PositionListWritable();
+ reverseForwardList = new PositionListWritable();
+ reverseReverseList = new PositionListWritable();
+ kmer = new KmerBytesWritable(kmerSize);
+ }
+
+ public NodeWritable(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
+ PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
+ this(kmer.getKmerLength());
+ this.nodeID.set(nodeID);
+ forwardForwardList.set(FFList);
+ forwardReverseList.set(FRList);
+ reverseForwardList.set(RFList);
+ reverseReverseList.set(RRList);
+ kmer.set(kmer);
+ }
+
+ public void setNodeID(PositionWritable ref) {
+ this.setNodeID(ref.getReadID(), ref.getPosInRead());
+ }
+
+ public void setNodeID(int readID, byte posInRead) {
+ nodeID.set(readID, posInRead);
+ }
+
+ public void setKmer(KmerBytesWritable right) {
+ this.kmer.set(right);
+ }
+
+ public void reset(int kmerSize) {
+ nodeID.set(0, (byte) 0);
+ forwardForwardList.reset();
+ forwardReverseList.reset();
+ reverseForwardList.reset();
+ reverseReverseList.reset();
+ kmer.reset(kmerSize);
+ }
+
+ public PositionListWritable getFFList() {
+ return forwardForwardList;
+ }
+
+ public PositionListWritable getFRList() {
+ return forwardReverseList;
+ }
+
+ public PositionListWritable getRFList() {
+ return reverseForwardList;
+ }
+
+ public PositionListWritable getRRList() {
+ return reverseReverseList;
+ }
+
+ public PositionWritable getNodeID() {
+ return nodeID;
+ }
+
+ public KmerBytesWritable getKmer() {
+ return kmer;
+ }
+
+ public int getCount() {
+ return kmer.getKmerLength();
+ }
+
+ public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
+ this.forwardForwardList.set(nextNode.forwardForwardList);
+ this.forwardReverseList.set(nextNode.forwardReverseList);
+ kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
+ }
+
+ public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
+ this.reverseForwardList.set(preNode.reverseForwardList);
+ this.reverseReverseList.set(preNode.reverseReverseList);
+ kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
+ }
+
+ public void set(NodeWritable node) {
+ this.nodeID.set(node.getNodeID().getReadID(), node.getNodeID().getPosInRead());
+ this.forwardForwardList.set(node.forwardForwardList);
+ this.forwardReverseList.set(node.forwardReverseList);
+ this.reverseForwardList.set(node.reverseForwardList);
+ this.reverseReverseList.set(node.reverseReverseList);
+ this.kmer.set(node.kmer);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.nodeID.readFields(in);
+ this.forwardForwardList.readFields(in);
+ this.forwardReverseList.readFields(in);
+ this.reverseForwardList.readFields(in);
+ this.reverseReverseList.readFields(in);
+ this.kmer.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ this.nodeID.write(out);
+ this.forwardForwardList.write(out);
+ this.forwardReverseList.write(out);
+ this.reverseForwardList.write(out);
+ this.reverseReverseList.write(out);
+ this.kmer.write(out);
+ }
+
+ @Override
+ public int compareTo(NodeWritable other) {
+ return this.nodeID.compareTo(other.nodeID);
+ }
+
+ @Override
+ public int hashCode() {
+ return nodeID.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof NodeWritable) {
+ NodeWritable nw = (NodeWritable) o;
+ return (this.nodeID.equals(nw.nodeID) && this.forwardForwardList.equals(nw.forwardForwardList)
+ && this.forwardReverseList.equals(nw.forwardReverseList)
+ && this.reverseForwardList.equals(nw.reverseForwardList)
+ && this.reverseReverseList.equals(nw.reverseReverseList) && this.kmer.equals(nw.kmer));
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('(');
+ sbuilder.append(nodeID.toString()).append('\t');
+ sbuilder.append(forwardForwardList.toString()).append('\t');
+ sbuilder.append(forwardReverseList.toString()).append('\t');
+ sbuilder.append(reverseForwardList.toString()).append('\t');
+ sbuilder.append(reverseReverseList.toString()).append('\t');
+ sbuilder.append(kmer.toString()).append(')');
+ return sbuilder.toString();
+ }
+
+ public int inDegree() {
+ return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+ }
+
+ public int outDegree() {
+ return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+ }
+
+ /*
+ * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1
+ */
+ public boolean isPathNode() {
+ return inDegree() == 1 && outDegree() == 1;
+ }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java
new file mode 100644
index 0000000..b6c42c2
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionListWritable.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionListWritable implements Writable, Iterable<PositionWritable>, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected byte[] storage;
+ protected int offset;
+ protected int valueCount;
+ protected static final byte[] EMPTY = {};
+ public static final int INTBYTES = 4;
+
+ protected PositionWritable posIter = new PositionWritable();
+
+ public PositionListWritable() {
+ this.storage = EMPTY;
+ this.valueCount = 0;
+ this.offset = 0;
+ }
+
+ public PositionListWritable(int count, byte[] data, int offset) {
+ setNewReference(count, data, offset);
+ }
+
+ public PositionListWritable(List<PositionWritable> posns) {
+ this();
+ for (PositionWritable p : posns) {
+ append(p);
+ }
+ }
+
+ public void setNewReference(int count, byte[] data, int offset) {
+ this.valueCount = count;
+ this.storage = data;
+ this.offset = offset;
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ }
+
+ protected int getCapacity() {
+ return storage.length - offset;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap > getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (storage.length - offset > 0) {
+ System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
+ }
+ storage = new_data;
+ offset = 0;
+ }
+ }
+
+ public PositionWritable getPosition(int i) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ posIter.setNewReference(storage, offset + i * PositionWritable.LENGTH);
+ return posIter;
+ }
+
+ public void resetPosition(int i, int readID, byte posInRead) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ Marshal.putInt(readID, storage, offset + i * PositionWritable.LENGTH);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
+ @Override
+ public Iterator<PositionWritable> iterator() {
+ Iterator<PositionWritable> it = new Iterator<PositionWritable>() {
+
+ private int currentIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < valueCount;
+ }
+
+ @Override
+ public PositionWritable next() {
+ return getPosition(currentIndex++);
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ return it;
+ }
+
+ public void set(PositionListWritable list2) {
+ set(list2.valueCount, list2.storage, list2.offset);
+ }
+
+ public void set(int valueCount, byte[] newData, int offset) {
+ this.valueCount = valueCount;
+ setSize(valueCount * PositionWritable.LENGTH);
+ if (valueCount > 0) {
+ System.arraycopy(newData, offset, storage, this.offset, valueCount * PositionWritable.LENGTH);
+ }
+ }
+
+ public void reset() {
+ valueCount = 0;
+ }
+
+ public void append(PositionWritable pos) {
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+ * PositionWritable.LENGTH, pos.getLength());
+ valueCount += 1;
+ }
+
+ public void append(int readID, byte posInRead) {
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ Marshal.putInt(readID, storage, offset + valueCount * PositionWritable.LENGTH);
+ storage[offset + valueCount * PositionWritable.LENGTH + PositionWritable.INTBYTES] = posInRead;
+ valueCount += 1;
+ }
+
+ public static int getCountByDataLength(int length) {
+ if (length % PositionWritable.LENGTH != 0) {
+ for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+ System.out.println(ste);
+ }
+ throw new IllegalArgumentException("Length of positionlist is invalid");
+ }
+ return length / PositionWritable.LENGTH;
+ }
+
+ public int getCountOfPosition() {
+ return valueCount;
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return valueCount * PositionWritable.LENGTH;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.valueCount = in.readInt();
+ setSize(valueCount * PositionWritable.LENGTH);
+ in.readFully(storage, offset, valueCount * PositionWritable.LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(valueCount);
+ out.write(storage, offset, valueCount * PositionWritable.LENGTH);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('[');
+ for (PositionWritable pos : this) {
+ sbuilder.append(pos.toString());
+ sbuilder.append(',');
+ }
+ if (valueCount > 0) {
+ sbuilder.setCharAt(sbuilder.length() - 1, ']');
+ } else {
+ sbuilder.append(']');
+ }
+ return sbuilder.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PositionListWritable))
+ return false;
+ PositionListWritable other = (PositionListWritable) o;
+ if (this.valueCount != other.valueCount)
+ return false;
+ for (int i=0; i < this.valueCount; i++) {
+ if (!this.getPosition(i).equals(other.getPosition(i)))
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java
new file mode 100644
index 0000000..1d509bb
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/velvet/oldtype/PositionWritable.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.velvet.oldtype;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionWritable implements WritableComparable<PositionWritable>, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected byte[] storage;
+ protected int offset;
+ public static final int LENGTH = 5;
+ public static final int INTBYTES = 4;
+
+ public PositionWritable() {
+ storage = new byte[LENGTH];
+ offset = 0;
+ }
+
+ public PositionWritable(int readID, byte posInRead) {
+ this();
+ set(readID, posInRead);
+ }
+
+ public PositionWritable(byte[] storage, int offset) {
+ setNewReference(storage, offset);
+ }
+
+ public void setNewReference(byte[] storage, int offset) {
+ this.storage = storage;
+ this.offset = offset;
+ }
+
+ public void set(PositionWritable pos) {
+ set(pos.getReadID(), pos.getPosInRead());
+ }
+
+ public void set(int readID, byte posInRead) {
+ Marshal.putInt(readID, storage, offset);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
+ public int getReadID() {
+ return Marshal.getInt(storage, offset);
+ }
+
+ public byte getPosInRead() {
+ return storage[offset + INTBYTES];
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return LENGTH;
+ }
+
+ public boolean isSameReadID(PositionWritable other) {
+ return getReadID() == other.getReadID();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ in.readFully(storage, offset, LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(storage, offset, LENGTH);
+ }
+
+ @Override
+ public int hashCode() {
+ return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PositionWritable))
+ return false;
+ PositionWritable other = (PositionWritable) o;
+ return this.getReadID() == other.getReadID() && this.getPosInRead() == other.getPosInRead();
+ }
+
+ @Override
+ public int compareTo(PositionWritable other) {
+ int diff1 = this.getReadID() - other.getReadID();
+ if (diff1 == 0) {
+ int diff2 = Math.abs((int) this.getPosInRead()) - Math.abs((int) other.getPosInRead());
+ if (diff2 == 0) {
+ return this.getPosInRead() - other.getPosInRead();
+ }
+ return diff2;
+ }
+ return diff1;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Integer.toString(getReadID()) + "," + Integer.toString((int) getPosInRead()) + ")";
+ }
+
+ /** A Comparator optimized for IntWritable. */
+ public static class Comparator extends WritableComparator {
+ public Comparator() {
+ super(PositionWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int thisValue = Marshal.getInt(b1, s1);
+ int thatValue = Marshal.getInt(b2, s2);
+ int diff1 = thisValue - thatValue;
+ if (diff1 == 0) {
+ int diff2 = Math.abs((int) b1[s1 + INTBYTES]) - Math.abs((int) b2[s2 + INTBYTES]);
+ if (diff2 == 0) {
+ return b1[s1 + INTBYTES] - b2[s2 + INTBYTES];
+ }
+ return diff2;
+ }
+ return diff1;
+ }
+ }
+
+ public static class FirstComparator implements RawComparator<PositionWritable> {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1 - 1, b2, s2, l2 - 1);
+ }
+
+ @Override
+ public int compare(PositionWritable o1, PositionWritable o2) {
+ int l = o1.getReadID();
+ int r = o2.getReadID();
+ return l == r ? 0 : (l < r ? -1 : 1);
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(PositionWritable.class, new Comparator());
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 8f7a69e..60c0682 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -15,7 +15,7 @@
package edu.uci.ics.genomix.hyracks.data.primitive;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
public class NodeReference extends NodeWritable {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index a3e7764..1827651 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -19,9 +19,11 @@
import java.nio.ByteBuffer;
import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.oldtype.KmerBytesWritable;
+
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 1b14625..2134177 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -25,8 +25,8 @@
import org.apache.hadoop.io.Text;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.oldtype.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.GeneCode;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 28d2959..def046b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -25,9 +25,9 @@
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.oldtype.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index 538a930..652a6f2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -17,9 +17,9 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.oldtype.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 77efcf8..e116ab9 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -27,8 +27,8 @@
import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index dffd3a9..bc00aa5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -18,8 +18,8 @@
import java.io.IOException;
import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index c3ec3c7..b4b1e73 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -19,8 +19,8 @@
import java.util.Map;
import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index 559060a..1e78b79 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -19,8 +19,8 @@
import java.util.Map;
import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index e9aede5..8e727959 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -20,8 +20,8 @@
import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..6919e76
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+/**
+ * used by precluster groupby
+ */
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index f027546..329d3f6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -29,6 +29,7 @@
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
+
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -45,14 +46,14 @@
private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
public static final int OutputKmerField = 0;
- public static final int outputNodeIdListField = 1;
+ public static final int OutputNodeField = 1;
private final int readLength;
private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null, null, null, null, null, null, null, null});
+ null});
public ReadsKeyValueParserFactory(int readlength, int k) {
this.readLength = readlength;
@@ -70,21 +71,21 @@
final ByteBuffer outputBuffer = ctx.allocateFrame();
final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputAppender.reset(outputBuffer, true);
-
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new IKeyValueParser<LongWritable, Text>() {
-
+
private PositionWritable nodeId = new PositionWritable();
private PositionListWritable nodeIdList = new PositionListWritable();
- private KmerListWritable edgeListForPreKmer = new KmerListWritable(kmerSize);
- private KmerListWritable edgeListForNextKmer = new KmerListWritable(kmerSize);
- private NodeWritable outputNode = new NodeWritable(kmerSize);
+ private KmerListWritable edgeListForPreKmer = new KmerListWritable();
+ private KmerListWritable edgeListForNextKmer = new KmerListWritable();
+ private NodeWritable outputNode = new NodeWritable();
- private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable preForwardKmer = new KmerBytesWritable();
+ private KmerBytesWritable preReverseKmer = new KmerBytesWritable();
+ private KmerBytesWritable curForwardKmer = new KmerBytesWritable();
+ private KmerBytesWritable curReverseKmer = new KmerBytesWritable();
+ private KmerBytesWritable nextForwardKmer = new KmerBytesWritable();
+ private KmerBytesWritable nextReverseKmer = new KmerBytesWritable();
private KmerDir preKmerDir = KmerDir.FORWARD;
private KmerDir curKmerDir = KmerDir.FORWARD;
@@ -119,37 +120,37 @@
}
private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
- /** first kmer */
+ /*first kmer*/
if (kmerSize >= array.length) {
return;
}
- outputNode.reset(kmerSize);
+ outputNode.reset();
curForwardKmer.setByRead(array, 0);
curReverseKmer.setByReadReverse(array, 0);
curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
setNextKmer(array[kmerSize]);
- setnodeId(mateId, readID, 1);
+ setnodeId(mateId, readID, 0);
setEdgeListForNextKmer();
writeToFrame(writer);
- /** middle kmer */
+ /*middle kmer*/
int i = kmerSize;
for (; i < array.length - 1; i++) {
- outputNode.reset(kmerSize);
+ outputNode.reset();
setPreKmerByOldCurKmer();
setCurKmerByOldNextKmer();
setNextKmer(array[i]);
- setnodeId(mateId, readID, i - kmerSize + 1);
+ setnodeId(mateId, readID, 0);//i - kmerSize + 1
setEdgeListForPreKmer();
setEdgeListForNextKmer();
writeToFrame(writer);
}
- /** last kmer */
- outputNode.reset(kmerSize);
+ /*last kmer*/
+ outputNode.reset();
setPreKmerByOldCurKmer();
setCurKmerByOldNextKmer();
- setnodeId(mateId, readID, array.length - kmerSize + 1);
+ setnodeId(mateId, readID, 0);//array.length - kmerSize + 1
setEdgeListForPreKmer();
writeToFrame(writer);
}
@@ -162,7 +163,7 @@
}
public void setNextKmer(byte nextChar){
- nextForwardKmer.set(curForwardKmer);
+ nextForwardKmer.setAsCopy(curForwardKmer);
nextForwardKmer.shiftKmerWithNextChar(nextChar);
nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
@@ -170,14 +171,14 @@
public void setPreKmerByOldCurKmer(){
preKmerDir = curKmerDir;
- preForwardKmer.set(curForwardKmer);
- preReverseKmer.set(curReverseKmer);
+ preForwardKmer.setAsCopy(curForwardKmer);
+ preReverseKmer.setAsCopy(curReverseKmer);
}
public void setCurKmerByOldNextKmer(){
curKmerDir = nextKmerDir;
- curForwardKmer.set(nextForwardKmer);
- curReverseKmer.set(nextReverseKmer);
+ curForwardKmer.setAsCopy(nextForwardKmer);
+ curReverseKmer.setAsCopy(nextReverseKmer);
}
public void writeToFrame(IFrameWriter writer) {
@@ -195,12 +196,12 @@
case FORWARD:
switch(preKmerDir){
case FORWARD:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preForwardKmer);
outputNode.setRRList(edgeListForPreKmer);
break;
case REVERSE:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preReverseKmer);
outputNode.setRFList(edgeListForPreKmer);
break;
@@ -209,12 +210,12 @@
case REVERSE:
switch(preKmerDir){
case FORWARD:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preForwardKmer);
outputNode.setFRList(edgeListForPreKmer);
break;
case REVERSE:
- edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.reset();
edgeListForPreKmer.append(preReverseKmer);
outputNode.setFFList(edgeListForPreKmer);
break;
@@ -228,12 +229,12 @@
case FORWARD:
switch(nextKmerDir){
case FORWARD:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextForwardKmer);
outputNode.setFFList(edgeListForNextKmer);
break;
case REVERSE:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextReverseKmer);
outputNode.setFRList(edgeListForNextKmer);
break;
@@ -242,12 +243,12 @@
case REVERSE:
switch(nextKmerDir){
case FORWARD:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextForwardKmer);
outputNode.setRFList(edgeListForNextKmer);
break;
case REVERSE:
- edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.reset();
edgeListForNextKmer.append(nextReverseKmer);
outputNode.setRRList(edgeListForNextKmer);
break;
@@ -260,30 +261,7 @@
try {
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
-
- //tupleBuilder.addField(node.getnodeId().getByteArray(), node.getnodeId().getStartOffset(), node.getnodeId().getLength());
-// tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
-// tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
-// tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
-// tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
-
- tupleBuilder.addField(node.getNodeIdList().getByteArray(), node.getNodeIdList().getStartOffset(), node.getNodeIdList().getLength());
-
- tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getFFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getFRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getRFList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
-
- tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
- tupleBuilder.getDataOutput().writeInt(node.getRRList().getCountOfPosition());
- tupleBuilder.addFieldEndOffset();
+ tupleBuilder.addField(node.marshalToByteArray(), 0, node.getSerializedLength());
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
index cfd582b..46fdd0e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -17,14 +17,12 @@
import java.io.DataOutput;
import java.io.IOException;
-
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -33,27 +31,23 @@
public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
/**
- *
+ * local Aggregate
*/
private static final long serialVersionUID = 1L;
-
- private final int readLength;
private final int kmerSize;
- public AggregateKmerAggregateFactory(int readlength, int k) {
- this.readLength = readlength;
+ public AggregateKmerAggregateFactory(int k) {
this.kmerSize = k;
}
-
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new IAggregatorDescriptor() {
-// private PositionReference position = new PositionReference();
- private NodeWritable readNode = new NodeWritable(kmerSize);
+ private NodeWritable readNode = new NodeWritable();
protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -80,24 +74,23 @@
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
NodeWritable localUniNode = (NodeWritable) state.state;
- localUniNode.reset(kmerSize);
- readNode.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//????
+ localUniNode.reset();
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
localUniNode.getFFList().appendList(readNode.getFFList());
localUniNode.getFRList().appendList(readNode.getFRList());
localUniNode.getRFList().appendList(readNode.getRFList());
localUniNode.getRRList().appendList(readNode.getRRList());
-// inputVal.append(position);
// make an empty field
- tupleBuilder.addFieldEndOffset();///???
+ tupleBuilder.addFieldEndOffset();// mark question?
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
NodeWritable localUniNode = (NodeWritable) state.state;
- readNode.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//????
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
localUniNode.getFFList().appendList(readNode.getFFList());
localUniNode.getFRList().appendList(readNode.getFRList());
@@ -117,7 +110,7 @@
DataOutput fieldOutput = tupleBuilder.getDataOutput();
NodeWritable localUniNode = (NodeWritable) state.state;
try {
- fieldOutput.write(localUniNode.getByteArray(), localUniNode.getStartOffset(), localUniNode.getLength());
+ fieldOutput.write(localUniNode.marshalToByteArray(), 0, localUniNode.getSerializedLength());
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index 2cad0ca..1ee6cae 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -17,23 +17,15 @@
import java.io.DataOutput;
import java.io.IOException;
-import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.hsqldb.lib.Iterator;
-
-import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -43,11 +35,9 @@
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
- private final int readLength;
private final int kmerSize;
- public MergeKmerAggregateFactory(int readlength, int k) {
- this.readLength = readlength;
+ public MergeKmerAggregateFactory(int k) {
this.kmerSize = k;
}
@@ -56,16 +46,17 @@
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
final int frameSize = ctx.getFrameSize();
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new IAggregatorDescriptor() {
-// private PositionReference position = new PositionReference();
+ private NodeWritable readNode = new NodeWritable();
- private NodeWritable readNode = new NodeWritable(kmerSize);
- private HashSet set = new HashSet();
- private PositionListWritable uniNodeIdList = new PositionListWritable();
- private KmerListWritable uniEdgeList = new KmerListWritable(kmerSize);
- private KmerBytesWritable tempKmer = new KmerBytesWritable(kmerSize);
- private PositionWritable tempPos = new PositionWritable();
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
@Override
public AggregateState createAggregateStates() {
@@ -76,17 +67,14 @@
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
NodeWritable localUniNode = (NodeWritable) state.state;
- localUniNode.reset(kmerSize);
- int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
- for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
- 1); offset += PositionReference.LENGTH) {
- readNode.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
- localUniNode.getFFList().appendList(readNode.getFFList());
- localUniNode.getFRList().appendList(readNode.getFRList());
- localUniNode.getRFList().appendList(readNode.getRFList());
- localUniNode.getRRList().appendList(readNode.getRRList());
- }
+ localUniNode.reset();
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+ localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+ localUniNode.getFFList().unionUpdate(readNode.getFFList());
+ localUniNode.getFRList().unionUpdate(readNode.getFRList());
+ localUniNode.getRFList().unionUpdate(readNode.getRFList());
+ localUniNode.getRRList().unionUpdate(readNode.getRRList());
+
//make a fake feild to cheat caller
tupleBuilder.addFieldEndOffset();
}
@@ -100,16 +88,12 @@
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
NodeWritable localUniNode = (NodeWritable) state.state;
- int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
- for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
- 1); offset += PositionReference.LENGTH) {
- position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
- localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
- localUniNode.getFFList().appendList(readNode.getFFList());
- localUniNode.getFRList().appendList(readNode.getFRList());
- localUniNode.getRFList().appendList(readNode.getRFList());
- localUniNode.getRRList().appendList(readNode.getRRList());
- }
+ readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
+ localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
+ localUniNode.getFFList().unionUpdate(readNode.getFFList());
+ localUniNode.getFRList().unionUpdate(readNode.getFRList());
+ localUniNode.getRFList().unionUpdate(readNode.getRFList());
+ localUniNode.getRRList().unionUpdate(readNode.getRRList());
}
@Override
@@ -118,61 +102,16 @@
throw new IllegalStateException("partial result method should not be called");
}
- @SuppressWarnings("unchecked")
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput fieldOutput = tupleBuilder.getDataOutput();
NodeWritable localUniNode = (NodeWritable) state.state;
- uniNodeIdList.reset();
- for(java.util.Iterator<PositionWritable> iter = localUniNode.getNodeIdList().iterator(); iter.hasNext();){
- tempPos.set(iter.next());
- if(set.add(tempPos))
- uniNodeIdList.append(tempPos);
- }
- localUniNode.getNodeIdList().reset();
- localUniNode.getNodeIdList().set(uniNodeIdList);
- uniEdgeList.reset();
- for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFFList().iterator(); iter.hasNext();){
- tempKmer.set(iter.next());
- if(set.add(tempKmer))
- uniEdgeList.append(tempKmer);
- }
- localUniNode.getFFList().reset();
- localUniNode.getFFList().set(uniEdgeList);
-
- uniEdgeList.reset();
- for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFRList().iterator(); iter.hasNext();){
- tempKmer.set(iter.next());
- if(set.add(tempKmer))
- uniEdgeList.append(tempKmer);
- }
- localUniNode.getFRList().reset();
- localUniNode.getFRList().set(uniEdgeList);
-
- uniEdgeList.reset();
- for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRFList().iterator(); iter.hasNext();){
- tempKmer.set(iter.next());
- if(set.add(tempKmer))
- uniEdgeList.append(tempKmer);
- }
- localUniNode.getRFList().reset();
- localUniNode.getRFList().set(uniEdgeList);
-
- uniEdgeList.reset();
- for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRRList().iterator(); iter.hasNext();){
- tempKmer.set(iter.next());
- if(set.add(tempKmer))
- uniEdgeList.append(tempKmer);
- }
- localUniNode.getRRList().reset();
- localUniNode.getRRList().set(uniEdgeList);
-
try {
- if (localUniNode.getLength() > frameSize / 2) {
- LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
+ if (localUniNode.getSerializedLength() > frameSize / 2) {
+ LOG.warn("MergeKmer: output data kmerByteSize is too big: " + localUniNode.getSerializedLength());
}
- fieldOutput.write(localUniNode.getByteArray(), localUniNode.getStartOffset(), localUniNode.getLength());
+ fieldOutput.write(localUniNode.marshalToByteArray(), 0, localUniNode.getSerializedLength());
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java
deleted file mode 100644
index f296fd4..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/KMerTextWriterFactory.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.hyracks.newgraph.io;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class KMerTextWriterFactory implements ITupleWriterFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- private final int kmerSize;
-
- public KMerTextWriterFactory(int k) {
- kmerSize = k;
- }
-
- public class TupleWriter implements ITupleWriter {
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
- private PositionListWritable plist = new PositionListWritable();
-
- @Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- try {
- if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
- throw new IllegalArgumentException("Not enough kmer bytes");
- }
- kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
- tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
- int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField)
- / PositionWritable.LENGTH;
- if (tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField) % PositionWritable.LENGTH != 0) {
- throw new IllegalArgumentException("Invalid count of position byte");
- }
- plist.setNewReference(countOfPos, tuple.getFieldData(KMerSequenceWriterFactory.InputPositionListField),
- tuple.getFieldStart(KMerSequenceWriterFactory.InputPositionListField));
-
- output.write(kmer.toString().getBytes());
- output.writeByte('\t');
- output.write(plist.toString().getBytes());
- output.writeByte('\n');
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void open(DataOutput output) throws HyracksDataException {
-
- }
-
- @Override
- public void close(DataOutput output) throws HyracksDataException {
- }
- }
-
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- return new TupleWriter();
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
index c579261..f700f6d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/io/NodeTextWriterFactory.java
@@ -17,9 +17,9 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -32,17 +32,20 @@
*
*/
private static final long serialVersionUID = 1L;
- private final int initialKmerSize;
-
- public NodeTextWriterFactory(int initialKmerSize) {
- this.initialKmerSize = initialKmerSize;
+ private final int kmerSize;
+ public static final int OutputKmerField = ReadsKeyValueParserFactory.OutputKmerField;
+ public static final int outputNodeField = ReadsKeyValueParserFactory.OutputNodeField;
+
+ public NodeTextWriterFactory(int k) {
+ this.kmerSize = k;
}
@Override
public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
return new ITupleWriter() {
- NodeWritable node = new NodeWritable(initialKmerSize);
-
+ NodeWritable node = new NodeWritable();
+
@Override
public void open(DataOutput output) throws HyracksDataException {
@@ -50,30 +53,9 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
- node.getFFList().setNewReference(
- tuple.getFieldLength(NodeSequenceWriterFactory.InputFFField) / PositionWritable.LENGTH,
- tuple.getFieldData(NodeSequenceWriterFactory.InputFFField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputFFField));
- node.getFRList().setNewReference(
- tuple.getFieldLength(NodeSequenceWriterFactory.InputFRField) / PositionWritable.LENGTH,
- tuple.getFieldData(NodeSequenceWriterFactory.InputFRField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputFRField));
- node.getRFList().setNewReference(
- tuple.getFieldLength(NodeSequenceWriterFactory.InputRFField) / PositionWritable.LENGTH,
- tuple.getFieldData(NodeSequenceWriterFactory.InputRFField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputRFField));
- node.getRRList().setNewReference(
- tuple.getFieldLength(NodeSequenceWriterFactory.InputRRField) / PositionWritable.LENGTH,
- tuple.getFieldData(NodeSequenceWriterFactory.InputRRField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputRRField));
-
- node.getKmer().setNewReference(
- Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)),
- tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
- tuple.getFieldStart(NodeSequenceWriterFactory.InputKmerBytesField));
+ node.setAsReference(tuple.getFieldData(outputNodeField), tuple.getFieldStart(outputNodeField));
+ node.getKmer().reset(kmerSize);
+ node.getKmer().setAsReference(tuple.getFieldData(OutputKmerField), tuple.getFieldStart(OutputKmerField));
try {
output.write(node.toString().getBytes());
output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index 369f874..1a95ac2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -26,25 +26,18 @@
import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
-//import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.AggregateKmerAggregateFactory;
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators.MergeKmerAggregateFactory;
-
-
-//import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
-//import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
-//import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
-//import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.io.NodeTextWriterFactory;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -53,17 +46,11 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
@@ -171,7 +158,7 @@
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- ReadsKeyValueParserFactory.readKmerOutputRec);//???????????
+ ReadsKeyValueParserFactory.readKmerOutputRec);
connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
jobSpec));
@@ -179,8 +166,8 @@
RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
jobSpec.setFrameSize(frameSize);
- Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(readLength,kmerSize),
- new MergeKmerAggregateFactory(readLength,kmerSize), new KmerHashPartitioncomputerFactory(),
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(kmerSize),
+ new MergeKmerAggregateFactory(kmerSize), new KmerHashPartitioncomputerFactory(),
new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
logDebug("LocalKmerGroupby Operator");
@@ -201,10 +188,6 @@
case TEXT:
nodeWriter = new NodeTextWriterFactory(kmerSize);
break;
- case BINARY:
- default:
- nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
- break;
}
logDebug("WriteOperator");
// Output Node
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
deleted file mode 100644
index 6026ac1..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.hyracks.newgraph.job;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenCheckReader extends JobGenBrujinGraph {
-
- private static final long serialVersionUID = 1L;
-
- public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) throws HyracksDataException {
- super(job, scheduler, ncMap, numPartitionPerMachine);
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
-
- JobSpecification jobSpec = new JobSpecification();
- logDebug("ReadKmer Operator");
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
- logDebug("Write kmer to result");
- generateRootByWriteKmerReader(jobSpec, readOperator);
-
- return jobSpec;
- }
-
- public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
- HDFSReadOperatorDescriptor readOperator) throws HyracksException {
- // Output Kmer
- HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
- hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- return new ITupleWriter() {
-
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
- private KmerListWritable kmerList = new KmerListWritable();
- //private IntermediateNodeWritable intermediateNode = new IntermediateNodeWritable();
-
- @Override
- public void open(DataOutput output) throws HyracksDataException {
- }
-
- @Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- try {
- if (kmer.getLength() > tuple
- .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
- throw new IllegalArgumentException("Not enough kmer bytes");
- }
- //kemr
- kmer.setNewReference(
- tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
- tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
- kmerList.setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputNodeIdField),
- tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeIdField),
- tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeIdField));
-// //nodeId
-// intermediateNode.getNodeId().setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeIdField),
-// tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeIdField));
- //FF list
-// intermediateNode.getFFList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputForwardForwardField) / 2 ,
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputForwardForwardField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputForwardForwardField));
-// //FR list
-// intermediateNode.getFRList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputForwardReverseField / kmer.getLength()),
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputForwardReverseField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputForwardReverseField));
-// //RF list
-// intermediateNode.getRFList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputReverseForwardField / kmer.getLength()),
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputReverseForwardField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputReverseForwardField));
-// //RR list
-// intermediateNode.getRRList().setNewReference(tuple.getFieldLength(ReadsKeyValueParserFactory.OutputReverseReverseField / kmer.getLength()),
-// tuple.getFieldData(ReadsKeyValueParserFactory.OutputReverseReverseField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputReverseReverseField));
-//
- output.write(kmer.toString().getBytes());
- output.writeByte('\t');
- output.write(kmerList.toString().getBytes());
- output.writeByte('\n');
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void close(DataOutput output) throws HyracksDataException {
-
- }
-
- };
- }
-
- });
- connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
- new OneToOneConnectorDescriptor(jobSpec));
- jobSpec.addRoot(writeKmerOperator);
- return writeKmerOperator;
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCreateKmerInfo.java
deleted file mode 100644
index 35d053a..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCreateKmerInfo.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.hyracks.newgraph.job;
-
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenCreateKmerInfo extends JobGenBrujinGraph {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public JobGenCreateKmerInfo(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) throws HyracksDataException {
- super(job, scheduler, ncMap, numPartitionPerMachine);
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
-
- JobSpecification jobSpec = new JobSpecification();
- logDebug("ReadKmer Operator");
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
- logDebug("Group by Kmer");
- AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
-
- logDebug("Write kmer to result");
- lastOperator = generateKmerWritorOperator(jobSpec, lastOperator);
- jobSpec.addRoot(lastOperator);
-
- return jobSpec;
- }
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 4ce59a0..d446f39 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -41,7 +41,7 @@
import edu.uci.ics.genomix.hyracks.driver.Driver;
import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.oldtype.NodeWritable;
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {