refactor KmerBytesWritable into Kmer and variable-length VKmer
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index b42b8f0..2738b43 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -36,34 +36,32 @@
 public class KmerBytesWritable extends BinaryComparable implements Serializable, WritableComparable<BinaryComparable> {
 
     private static final long serialVersionUID = 1L;
-    private static final byte[] EMPTY_BYTES = { 0, 0, 0, 0 }; // int indicating 0 length
-    private static final int HEADER_SIZE = 4; // number of bytes for header info
+    protected static final byte[] EMPTY_BYTES = {};
 
-    private int lettersInKmer;
-    private int bytesUsed;
-    private byte[] bytes;
-    private int offset;
+    protected int lettersInKmer;
+    protected int bytesUsed;
+    protected byte[] bytes;
+    protected int offset;
 
     /**
      * Initialize as empty kmer
      */
     public KmerBytesWritable() {
-        this(EMPTY_BYTES, 0);
+        this(0, EMPTY_BYTES, 0);
     }
 
     /**
      * Copy contents of kmer string
      */
     public KmerBytesWritable(String kmer) {
-        bytes = new byte[HEADER_SIZE + KmerUtil.getByteNumFromK(kmer.length())];
         setAsCopy(kmer);
     }
 
     /**
      * Set as reference to given data
      */
-    public KmerBytesWritable(byte[] storage, int offset) {
-        setAsReference(storage, offset);
+    public KmerBytesWritable(int k, byte[] storage, int offset) {
+        setAsReference(k, storage, offset);
     }
 
     /**
@@ -71,7 +69,7 @@
      */
     public KmerBytesWritable(int k) {
         if (k > 0) {
-            this.bytes = new byte[HEADER_SIZE + KmerUtil.getByteNumFromK(k)];
+            this.bytes = new byte[KmerUtil.getByteNumFromK(k)];
         } else {
             this.bytes = EMPTY_BYTES;
         }
@@ -97,7 +95,7 @@
     public void setAsCopy(KmerBytesWritable other) {
         reset(other.lettersInKmer);
         if (lettersInKmer > 0) {
-            System.arraycopy(other.bytes, other.offset + HEADER_SIZE, bytes, this.offset + HEADER_SIZE, lettersInKmer);
+            System.arraycopy(other.bytes, other.offset, bytes, this.offset, lettersInKmer);
         }
     }
 
@@ -105,9 +103,9 @@
      * set from String kmer
      */
     public void setAsCopy(String kmer) {
-        int k = kmer.length();
-        reset(k);
-        System.arraycopy(kmer.getBytes(), 0, bytes, offset + HEADER_SIZE, k);
+        setKmerLength(kmer.length());
+        bytes = kmer.getBytes();
+        offset = 0;
     }
 
     /**
@@ -116,10 +114,9 @@
      * @param newData
      * @param offset
      */
-    public void setAsCopy(byte[] newData, int offset) {
-        int k = Marshal.getInt(newData, offset);
+    public void setAsCopy(int k, byte[] newData, int offset) {
         reset(k);
-        System.arraycopy(newData, offset + HEADER_SIZE, bytes, this.offset + HEADER_SIZE, k);
+        System.arraycopy(newData, offset, bytes, this.offset, k);
     }
 
     /**
@@ -136,21 +133,18 @@
     /**
      * Point this datablock to the given bytes array
      * It works like the pointer to new datablock.
-     * NOTE: kmerlength is NOT updated
      * 
      * @param newData
      * @param offset
      */
-    public void setAsReference(byte[] newData, int offset) {
+    public void setAsReference(int k, byte[] newData, int offset) {
         this.bytes = newData;
         this.offset = offset;
-        int kRequested = Marshal.getInt(newData, offset);
-        int bytesRequested = KmerUtil.getByteNumFromK(kRequested);
-        if (newData.length - offset < bytesRequested) {
-            throw new IllegalArgumentException("Requested " + bytesRequested + " bytes (k=" + kRequested
+        setKmerLength(k);
+        if (newData.length - offset < bytesUsed) {
+            throw new IllegalArgumentException("Requested " + bytesUsed + " bytes (k=" + k
                     + ") but buffer has only " + (newData.length - offset) + " bytes");
         }
-        setKmerLength(kRequested);
     }
 
     protected void setSize(int size) {
@@ -200,9 +194,8 @@
     }
 
     public void setKmerLength(int k) {
-        this.bytesUsed = HEADER_SIZE + KmerUtil.getByteNumFromK(k);
+        this.bytesUsed = KmerUtil.getByteNumFromK(k);
         this.lettersInKmer = k;
-        Marshal.putInt(k, bytes, offset);
     }
 
     public int getKmerLength() {
@@ -241,13 +234,13 @@
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
-                bytes[offset + HEADER_SIZE + bcount--] = l;
+                bytes[offset + bcount--] = l;
                 l = 0;
                 bytecount = 0;
             }
         }
         if (bcount >= 0) {
-            bytes[offset + HEADER_SIZE] = l;
+            bytes[offset] = l;
         }
     }
 
@@ -276,13 +269,13 @@
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
-                bytes[offset + HEADER_SIZE + bcount--] = l;
+                bytes[offset + bcount--] = l;
                 l = 0;
                 bytecount = 0;
             }
         }
         if (bcount >= 0) {
-            bytes[offset + HEADER_SIZE] = l;
+            bytes[offset] = l;
         }
     }
 
@@ -310,14 +303,14 @@
      * @return the shift out gene, in gene code format
      */
     public byte shiftKmerWithNextCode(byte c) {
-        byte output = (byte) (bytes[offset + HEADER_SIZE + bytesUsed - 1] & 0x03);
+        byte output = (byte) (bytes[offset + bytesUsed - 1] & 0x03);
         for (int i = bytesUsed - 1; i > 0; i--) {
-            byte in = (byte) (bytes[offset + HEADER_SIZE + i - 1] & 0x03);
-            bytes[offset + HEADER_SIZE + i] = (byte) (((bytes[offset + HEADER_SIZE + i] >>> 2) & 0x3f) | (in << 6));
+            byte in = (byte) (bytes[offset + i - 1] & 0x03);
+            bytes[offset + i] = (byte) (((bytes[offset + i] >>> 2) & 0x3f) | (in << 6));
         }
         int pos = ((lettersInKmer - 1) % 4) << 1;
         byte code = (byte) (c << pos);
-        bytes[offset + HEADER_SIZE] = (byte) (((bytes[offset + HEADER_SIZE] >>> 2) & 0x3f) | code);
+        bytes[offset] = (byte) (((bytes[offset] >>> 2) & 0x3f) | code);
         clearLeadBit();
         return output;
     }
@@ -342,12 +335,12 @@
      */
     public byte shiftKmerWithPreCode(byte c) {
         int pos = ((lettersInKmer - 1) % 4) << 1;
-        byte output = (byte) ((bytes[offset + HEADER_SIZE] >> pos) & 0x03);
+        byte output = (byte) ((bytes[offset] >> pos) & 0x03);
         for (int i = 0; i < bytesUsed - 1; i++) {
-            byte in = (byte) ((bytes[offset + HEADER_SIZE + i + 1] >> 6) & 0x03);
-            bytes[offset + HEADER_SIZE + i] = (byte) ((bytes[offset + HEADER_SIZE + i] << 2) | in);
+            byte in = (byte) ((bytes[offset + i + 1] >> 6) & 0x03);
+            bytes[offset + i] = (byte) ((bytes[offset + i] << 2) | in);
         }
-        bytes[offset + HEADER_SIZE + bytesUsed - 1] = (byte) ((bytes[offset + HEADER_SIZE + bytesUsed - 1] << 2) | c);
+        bytes[offset + bytesUsed - 1] = (byte) ((bytes[offset + bytesUsed - 1] << 2) | c);
         clearLeadBit();
         return output;
     }
@@ -368,11 +361,11 @@
         lettersInKmer += kmer.lettersInKmer - initialKmerSize + 1;
         setSize(KmerUtil.getByteNumFromK(lettersInKmer));
         for (int i = 1; i <= preSize; i++) {
-            bytes[offset + HEADER_SIZE + bytesUsed - i] = bytes[offset + preSize - i];
+            bytes[offset + bytesUsed - i] = bytes[offset + preSize - i];
         }
         for (int k = initialKmerSize - 1; k < kmer.getKmerLength(); k += 4) {
             byte onebyte = getOneByteFromKmerAtPosition(k, kmer.getBytes(), kmer.getOffset(), kmer.getLength());
-            appendOneByteAtPosition(preKmerLength + k - initialKmerSize + 1, onebyte, bytes, offset + HEADER_SIZE, bytesUsed);
+            appendOneByteAtPosition(preKmerLength + k - initialKmerSize + 1, onebyte, bytes, offset, bytesUsed);
         }
         clearLeadBit();
     }
@@ -395,7 +388,7 @@
         setSize(KmerUtil.getByteNumFromK(lettersInKmer));
         // copy prefix into right-side of buffer
         for (int i = 1; i <= preSize; i++) {
-            bytes[offset + HEADER_SIZE + bytesUsed - i] = bytes[offset + HEADER_SIZE + preSize - i];
+            bytes[offset + bytesUsed - i] = bytes[offset + preSize - i];
         }
 
         int bytecount = (preKmerLength % 4) * 2;
@@ -407,13 +400,13 @@
             l |= (byte) (code << bytecount);
             bytecount += 2;
             if (bytecount == 8) {
-                bytes[offset + HEADER_SIZE + bcount--] = l;
+                bytes[offset + bcount--] = l;
                 l = 0;
                 bytecount = 0;
             }
         }
         if (bcount >= 0) {
-            bytes[offset + HEADER_SIZE] = l;
+            bytes[offset] = l;
         }
     }
 
@@ -463,7 +456,7 @@
                 posnInByte = 0;
             }
         }
-        bytes[offset + HEADER_SIZE] = cacheByte;
+        bytes[offset] = cacheByte;
         clearLeadBit();
     }
 
@@ -487,18 +480,18 @@
         // copy prekmer
         for (int k = 0; k < preKmer.lettersInKmer - initialKmerSize + 1; k += 4) {
             byte onebyte = getOneByteFromKmerAtPosition(k, preKmer.bytes, preKmer.offset, preKmer.bytesUsed);
-            appendOneByteAtPosition(k, onebyte, bytes, offset + HEADER_SIZE, bytesUsed);
+            appendOneByteAtPosition(k, onebyte, bytes, offset, bytesUsed);
         }
 
         // copy current kmer
         int k = 4;
         for (; k < preKmerLength; k += 4) {
             byte onebyte = getOneByteFromKmerAtPosition(k, bytes, offset, preSize);
-            appendOneByteAtPosition(preKmer.lettersInKmer - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset + HEADER_SIZE,
+            appendOneByteAtPosition(preKmer.lettersInKmer - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset,
                     bytesUsed);
             cacheByte = onebyte;
         }
-        appendOneByteAtPosition(preKmer.lettersInKmer - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset + HEADER_SIZE, bytesUsed);
+        appendOneByteAtPosition(preKmer.lettersInKmer - initialKmerSize + k - 4 + 1, cacheByte, bytes, offset, bytesUsed);
         clearLeadBit();
     }
 
@@ -550,7 +543,7 @@
 
     protected void clearLeadBit() {
         if (lettersInKmer % 4 != 0) {
-            bytes[offset + HEADER_SIZE] &= (1 << ((lettersInKmer % 4) << 1)) - 1;
+            bytes[offset] &= (1 << ((lettersInKmer % 4) << 1)) - 1;
         }
     }
 
@@ -564,16 +557,15 @@
                 this.offset = 0;
                 
             }
-            in.readFully(bytes, offset + HEADER_SIZE, bytesUsed - HEADER_SIZE);
+            in.readFully(bytes, offset, bytesUsed);
         }
-        Marshal.putInt(lettersInKmer, bytes, offset);
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeInt(lettersInKmer);
         if (lettersInKmer > 0) {
-            out.write(bytes, offset + HEADER_SIZE, bytesUsed - HEADER_SIZE);
+            out.write(bytes, offset, bytesUsed);
         }
     }
 
@@ -591,10 +583,11 @@
 
     @Override
     public String toString() {
-        return KmerUtil.recoverKmerFrom(this.lettersInKmer, this.getBytes(), offset + HEADER_SIZE, this.getLength());
+        return KmerUtil.recoverKmerFrom(this.lettersInKmer, this.getBytes(), offset, this.getLength());
     }
 
     public static class Comparator extends WritableComparator {
+    	private static final int LEADING_BYTES = 4;
 
         public Comparator() {
             super(KmerBytesWritable.class);
@@ -604,7 +597,7 @@
             int kmerlength1 = Marshal.getInt(b1, s1);
             int kmerlength2 = Marshal.getInt(b2, s2);
             if (kmerlength1 == kmerlength2) {
-                return compareBytes(b1, s1 + HEADER_SIZE, l1 - HEADER_SIZE, b2, s2 + HEADER_SIZE, l2 - HEADER_SIZE);
+                return compareBytes(b1, s1 + LEADING_BYTES, l1 - LEADING_BYTES, b2, s2 + LEADING_BYTES, l2 - LEADING_BYTES);
             }
             return kmerlength1 - kmerlength2;
         }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
new file mode 100644
index 0000000..cd982ef
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.KmerUtil;
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.oldtype.NodeWritable.DirectionFlag;
+
+/**
+ * Variable kmer length byteswritable It was used to generate the graph in which
+ * phase the kmer length doesn't change. Thus the kmerByteSize of bytes doesn't
+ * change either.
+ */
+public class VKmerBytesWritable extends KmerBytesWritable {
+
+	private static final long serialVersionUID = 1L;
+	protected static final byte[] EMPTY_BYTES = { 0, 0, 0, 0 }; // int
+																// indicating 0
+																// length
+	protected static final int HEADER_SIZE = 4; // number of bytes for header
+												// info
+
+	/**
+	 * Initialize as empty kmer
+	 */
+	public VKmerBytesWritable() {
+		this(EMPTY_BYTES, 0);
+	}
+
+	/**
+	 * Copy contents of kmer string
+	 */
+	public VKmerBytesWritable(String kmer) {
+		bytes = new byte[HEADER_SIZE + KmerUtil.getByteNumFromK(kmer.length())];
+		offset = 0;
+		setAsCopy(kmer);
+	}
+
+	/**
+	 * Set as reference to given data
+	 */
+	public VKmerBytesWritable(byte[] storage, int offset) {
+		setAsReference(storage, offset);
+	}
+
+	/**
+	 * Reserve space for k letters
+	 */
+	public VKmerBytesWritable(int k) {
+		if (k > 0) {
+			bytes = new byte[HEADER_SIZE + KmerUtil.getByteNumFromK(k)];
+		} else {
+			bytes = EMPTY_BYTES;
+		}
+		offset = 0;
+		setKmerLength(k);
+	}
+
+	/**
+	 * deep copy of kmer in other
+	 * 
+	 * @param other
+	 */
+	public VKmerBytesWritable(VKmerBytesWritable other) {
+		this(other.lettersInKmer);
+		setAsCopy(other);
+	}
+
+	/**
+	 * Deep copy of the given kmer
+	 * 
+	 * @param other
+	 */
+	@Override
+	public void setAsCopy(KmerBytesWritable other) {
+		reset(other.lettersInKmer);
+		if (lettersInKmer > 0) {
+			System.arraycopy(other.bytes, other.offset, bytes, this.offset
+					+ HEADER_SIZE, lettersInKmer);
+		}
+	}
+
+	/**
+	 * Deep copy of the given kmer
+	 * 
+	 * @param other
+	 */
+	public void setAsCopy(VKmerBytesWritable other) {
+		reset(other.lettersInKmer);
+		if (lettersInKmer > 0) {
+			System.arraycopy(other.bytes, other.offset + HEADER_SIZE, bytes,
+					this.offset + HEADER_SIZE, lettersInKmer);
+		}
+	}
+
+	/**
+	 * set from String kmer
+	 */
+	@Override
+	public void setAsCopy(String kmer) {
+		int k = kmer.length();
+		reset(k);
+		System.arraycopy(kmer.getBytes(), 0, bytes, offset + HEADER_SIZE, k);
+	}
+
+	/**
+	 * Deep copy of the given bytes data
+	 * 
+	 * @param k
+	 * @param newData
+	 *            : presumed NOT to have a header
+	 * @param offset
+	 */
+	@Override
+	public void setAsCopy(int k, byte[] newData, int offset) {
+		reset(k);
+		System.arraycopy(newData, offset, bytes, this.offset + HEADER_SIZE, k);
+	}
+
+	/**
+	 * Deep copy of the given bytes data
+	 * 
+	 * @param newData
+	 *            : byte array to copy (should have a header)
+	 * @param offset
+	 */
+	public void setAsCopy(byte[] newData, int offset) {
+		int k = Marshal.getInt(newData, offset);
+		reset(k);
+		System.arraycopy(newData, offset + HEADER_SIZE, bytes, this.offset
+				+ HEADER_SIZE, k);
+	}
+
+	/**
+	 * Point this datablock to the given bytes array It works like the pointer
+	 * to new datablock.
+	 * 
+	 * @param newData
+	 *            : byte array to copy (should have a header)
+	 * @param offset
+	 */
+	public void setAsReference(byte[] newData, int offset) {
+		this.bytes = newData;
+		this.offset = offset;
+		int kRequested = Marshal.getInt(newData, offset);
+		int bytesRequested = KmerUtil.getByteNumFromK(kRequested);
+		if (newData.length - offset < bytesRequested) {
+			throw new IllegalArgumentException("Requested " + bytesRequested
+					+ " bytes (k=" + kRequested + ") but buffer has only "
+					+ (newData.length - offset) + " bytes");
+		}
+		setKmerLength(kRequested);
+	}
+
+	@Override
+	public void setKmerLength(int k) {
+		this.bytesUsed = HEADER_SIZE + KmerUtil.getByteNumFromK(k);
+		this.lettersInKmer = k;
+		Marshal.putInt(k, bytes, offset);
+	}
+
+	@Override
+	public void setByRead(byte[] stringBytes, int start) {
+		offset += HEADER_SIZE;
+		super.setByRead(stringBytes, start);
+		offset -= HEADER_SIZE;
+	}
+
+	@Override
+	public void setByReadReverse(byte[] array, int start) {
+		offset += HEADER_SIZE;
+		super.setByReadReverse(array, start);
+		offset -= HEADER_SIZE;
+	}
+
+	@Override
+	public byte shiftKmerWithNextCode(byte c) {
+		offset += HEADER_SIZE;
+		byte retval = super.shiftKmerWithNextCode(c);
+		offset -= HEADER_SIZE;
+		return retval;
+
+	}
+
+	@Override
+	public byte shiftKmerWithPreCode(byte c) {
+		offset += HEADER_SIZE;
+		byte retval = super.shiftKmerWithPreCode(c);
+		offset -= HEADER_SIZE;
+		return retval;
+	}
+
+	@Override
+	public void mergeWithFFKmer(int initialKmerSize, KmerBytesWritable kmer) {
+		offset += HEADER_SIZE;
+		super.mergeWithFFKmer(initialKmerSize, kmer);
+		offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	/**
+	 * Merge with kmer that's FF to me. See KmerBytesWritable.mergeWithFFKmer.
+	 */
+	public void mergeWithFFKmer(int initialKmerSize, VKmerBytesWritable kmer) {
+		offset += HEADER_SIZE;
+		kmer.offset += HEADER_SIZE;
+		super.mergeWithFFKmer(initialKmerSize, kmer);
+		offset -= HEADER_SIZE;
+		kmer.offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	@Override
+	public void mergeWithFRKmer(int initialKmerSize, KmerBytesWritable kmer) {
+		offset += HEADER_SIZE;
+		super.mergeWithFRKmer(initialKmerSize, kmer);
+		offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	/**
+	 * Merge with kmer that's FR to me. See KmerBytesWritable.mergeWithFRKmer.
+	 */
+	public void mergeWithFRKmer(int initialKmerSize, VKmerBytesWritable kmer) {
+		offset += HEADER_SIZE;
+		kmer.offset += HEADER_SIZE;
+		super.mergeWithFRKmer(initialKmerSize, kmer);
+		offset -= HEADER_SIZE;
+		kmer.offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	@Override
+	public void mergeWithRFKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+		offset += HEADER_SIZE;
+		super.mergeWithRFKmer(initialKmerSize, preKmer);
+		offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	/**
+	 * Merge with kmer that's RF to me. See KmerBytesWritable.mergeWithRFKmer.
+	 */
+	public void mergeWithRFKmer(int initialKmerSize, VKmerBytesWritable preKmer) {
+		offset += HEADER_SIZE;
+		preKmer.offset += HEADER_SIZE;
+		super.mergeWithRFKmer(initialKmerSize, preKmer);
+		offset -= HEADER_SIZE;
+		preKmer.offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	@Override
+	public void mergeWithRRKmer(int initialKmerSize, KmerBytesWritable preKmer) {
+		offset += HEADER_SIZE;
+		super.mergeWithRRKmer(initialKmerSize, preKmer);
+		offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	/**
+	 * Merge with kmer that's RR to me. See KmerBytesWritable.mergeWithRRKmer.
+	 */
+	public void mergeWithRRKmer(int initialKmerSize, VKmerBytesWritable preKmer) {
+		offset += HEADER_SIZE;
+		preKmer.offset += HEADER_SIZE;
+		super.mergeWithRRKmer(initialKmerSize, preKmer);
+		offset -= HEADER_SIZE;
+		preKmer.offset -= HEADER_SIZE;
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	@Override
+	protected void clearLeadBit() {
+		offset += HEADER_SIZE;
+		super.clearLeadBit();
+		offset -= HEADER_SIZE;
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		lettersInKmer = in.readInt();
+		bytesUsed = HEADER_SIZE + KmerUtil.getByteNumFromK(lettersInKmer);
+		if (lettersInKmer > 0) {
+			if (this.bytes.length < this.bytesUsed) {
+				this.bytes = new byte[this.bytesUsed];
+				this.offset = 0;
+			}
+			in.readFully(bytes, offset + HEADER_SIZE, bytesUsed - HEADER_SIZE);
+		}
+		Marshal.putInt(lettersInKmer, bytes, offset);
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(lettersInKmer);
+		if (lettersInKmer > 0) {
+			out.write(bytes, offset + HEADER_SIZE, bytesUsed - HEADER_SIZE);
+		}
+	}
+
+	@Override
+	public boolean equals(Object right) {
+		if (right instanceof VKmerBytesWritable) {
+			return this.lettersInKmer == ((VKmerBytesWritable) right).lettersInKmer
+					&& super.equals(right);
+		} else if (right instanceof KmerBytesWritable) {
+			// for Kmers, we need to skip our header
+			KmerBytesWritable rightKmer = (KmerBytesWritable) right;
+			if (lettersInKmer != rightKmer.lettersInKmer) {
+				// break early
+				return false;
+			}
+			for (int i = 0; i < lettersInKmer; i++) {
+				if (bytes[i + HEADER_SIZE] != rightKmer.bytes[i]) {
+					return false;
+				}
+			}
+			return true;
+		}
+		return false;
+	}
+
+	@Override
+	public String toString() {
+		return KmerUtil.recoverKmerFrom(this.lettersInKmer, this.getBytes(),
+				offset + HEADER_SIZE, this.getLength());
+	}
+
+	public static class Comparator extends WritableComparator {
+
+		public Comparator() {
+			super(VKmerBytesWritable.class);
+		}
+
+		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+			int kmerlength1 = Marshal.getInt(b1, s1);
+			int kmerlength2 = Marshal.getInt(b2, s2);
+			if (kmerlength1 == kmerlength2) {
+				return compareBytes(b1, s1 + HEADER_SIZE, l1 - HEADER_SIZE, b2,
+						s2 + HEADER_SIZE, l2 - HEADER_SIZE);
+			}
+			return kmerlength1 - kmerlength2;
+		}
+	}
+
+	static { // register this comparator
+		WritableComparator.define(VKmerBytesWritable.class, new Comparator());
+	}
+
+}