finished framework
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
deleted file mode 100644
index 7aa05f5..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.type.old;
-
-@Deprecated
-public class Kmer {
-
-    public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
-
-    public final static class GENE_CODE {
-
-        /**
-         * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
-         */
-        public static final byte A = 0;
-        public static final byte C = 1;
-        public static final byte G = 2;
-        public static final byte T = 3;
-
-        public static byte getCodeFromSymbol(byte ch) {
-            byte r = 0;
-            switch (ch) {
-                case 'A':
-                case 'a':
-                    r = A;
-                    break;
-                case 'C':
-                case 'c':
-                    r = C;
-                    break;
-                case 'G':
-                case 'g':
-                    r = G;
-                    break;
-                case 'T':
-                case 't':
-                    r = T;
-                    break;
-            }
-            return r;
-        }
-
-        public static byte getSymbolFromCode(byte code) {
-            if (code > 3) {
-                return '!';
-            }
-            return GENE_SYMBOL[code];
-        }
-
-        public static byte getAdjBit(byte t) {
-            byte r = 0;
-            switch (t) {
-                case 'A':
-                case 'a':
-                    r = 1 << A;
-                    break;
-                case 'C':
-                case 'c':
-                    r = 1 << C;
-                    break;
-                case 'G':
-                case 'g':
-                    r = 1 << G;
-                    break;
-                case 'T':
-                case 't':
-                    r = 1 << T;
-                    break;
-            }
-            return r;
-        }
-
-        /**
-         * It works for path merge.
-         * Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
-         * 
-         * @param t
-         *            the neighbor code in BitMap
-         * @return the genecode
-         */
-        public static byte getGeneCodeFromBitMap(byte t) {
-            switch (t) {
-                case 1 << A:
-                    return A;
-                case 1 << C:
-                    return C;
-                case 1 << G:
-                    return G;
-                case 1 << T:
-                    return T;
-            }
-            return -1;
-        }
-
-        public static byte mergePreNextAdj(byte pre, byte next) {
-            return (byte) (pre << 4 | (next & 0x0f));
-        }
-
-        public static String getSymbolFromBitMap(byte code) {
-            int left = (code >> 4) & 0x0F;
-            int right = code & 0x0F;
-            StringBuilder str = new StringBuilder();
-            for (int i = A; i <= T; i++) {
-                if ((left & (1 << i)) != 0) {
-                    str.append((char) GENE_SYMBOL[i]);
-                }
-            }
-            str.append('|');
-            for (int i = A; i <= T; i++) {
-                if ((right & (1 << i)) != 0) {
-                    str.append((char) GENE_SYMBOL[i]);
-                }
-            }
-            return str.toString();
-        }
-    }
-
-    public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
-        StringBuilder strKmer = new StringBuilder();
-        int byteId = keyStart + keyLength - 1;
-        byte currentbyte = keyData[byteId];
-        for (int geneCount = 0; geneCount < k; geneCount++) {
-            if (geneCount % 4 == 0 && geneCount > 0) {
-                currentbyte = keyData[--byteId];
-            }
-            strKmer.append((char) GENE_SYMBOL[(currentbyte >> ((geneCount % 4) * 2)) & 0x03]);
-        }
-        return strKmer.toString();
-    }
-
-    public static int getByteNumFromK(int k) {
-        int x = k / 4;
-        if (k % 4 != 0) {
-            x += 1;
-        }
-        return x;
-    }
-
-    /**
-     * Compress Kmer into bytes array AATAG will compress as [0x000G, 0xATAA]
-     * 
-     * @param kmer
-     * @param input
-     *            array
-     * @param start
-     *            position
-     * @return initialed kmer array
-     */
-    public static byte[] compressKmer(int k, byte[] array, int start) {
-        final int byteNum = getByteNumFromK(k);
-        byte[] bytes = new byte[byteNum];
-
-        byte l = 0;
-        int bytecount = 0;
-        int bcount = byteNum - 1;
-        for (int i = start; i < start + k; i++) {
-            byte code = GENE_CODE.getCodeFromSymbol(array[i]);
-            l |= (byte) (code << bytecount);
-            bytecount += 2;
-            if (bytecount == 8) {
-                bytes[bcount--] = l;
-                l = 0;
-                bytecount = 0;
-            }
-        }
-        if (bcount >= 0) {
-            bytes[0] = l;
-        }
-        return bytes;
-    }
-
-    /**
-     * Shift Kmer to accept new input
-     * 
-     * @param kmer
-     * @param bytes
-     *            Kmer Array
-     * @param c
-     *            Input new gene character
-     * @return the shiftout gene, in gene code format
-     */
-    public static byte moveKmer(int k, byte[] kmer, byte c) {
-        int byteNum = kmer.length;
-        byte output = (byte) (kmer[byteNum - 1] & 0x03);
-        for (int i = byteNum - 1; i > 0; i--) {
-            byte in = (byte) (kmer[i - 1] & 0x03);
-            kmer[i] = (byte) (((kmer[i] >>> 2) & 0x3f) | (in << 6));
-        }
-        int pos = ((k - 1) % 4) << 1;
-        byte code = (byte) (GENE_CODE.getCodeFromSymbol(c) << pos);
-        kmer[0] = (byte) (((kmer[0] >>> 2) & 0x3f) | code);
-        return (byte) (1 << output);
-    }
-
-    public static byte reverseKmerByte(byte k) {
-        int x = (((k >> 2) & 0x33) | ((k << 2) & 0xcc));
-        return (byte) (((x >> 4) & 0x0f) | ((x << 4) & 0xf0));
-    }
-
-    public static byte[] reverseKmer(int k, byte[] kmer) {
-        byte[] reverseKmer = new byte[kmer.length];
-
-        int curPosAtKmer = ((k - 1) % 4) << 1;
-        int curByteAtKmer = 0;
-
-        int curPosAtReverse = 0;
-        int curByteAtReverse = reverseKmer.length - 1;
-        reverseKmer[curByteAtReverse] = 0;
-        for (int i = 0; i < k; i++) {
-            byte gene = (byte) ((kmer[curByteAtKmer] >> curPosAtKmer) & 0x03);
-            reverseKmer[curByteAtReverse] |= gene << curPosAtReverse;
-            curPosAtReverse += 2;
-            if (curPosAtReverse >= 8) {
-                curPosAtReverse = 0;
-                reverseKmer[--curByteAtReverse] = 0;
-            }
-            curPosAtKmer -= 2;
-            if (curPosAtKmer < 0) {
-                curPosAtKmer = 6;
-                curByteAtKmer++;
-            }
-        }
-
-        return reverseKmer;
-    }
-
-    /**
-     * Compress Reversed Kmer into bytes array AATAG will compress as
-     * [0x000A,0xATAG]
-     * 
-     * @param kmer
-     * @param input
-     *            array
-     * @param start
-     *            position
-     * @return initialed kmer array
-     */
-    public static byte[] compressKmerReverse(int k, byte[] array, int start) {
-        final int byteNum = getByteNumFromK(k);
-        byte[] bytes = new byte[byteNum];
-
-        byte l = 0;
-        int bytecount = 0;
-        int bcount = byteNum - 1;
-        for (int i = start + k - 1; i >= 0; i--) {
-            byte code = GENE_CODE.getCodeFromSymbol(array[i]);
-            l |= (byte) (code << bytecount);
-            bytecount += 2;
-            if (bytecount == 8) {
-                bytes[bcount--] = l;
-                l = 0;
-                bytecount = 0;
-            }
-        }
-        if (bcount >= 0) {
-            bytes[0] = l;
-        }
-        return bytes;
-    }
-
-    /**
-     * Shift Kmer to accept new input
-     * 
-     * @param kmer
-     * @param bytes
-     *            Kmer Array
-     * @param c
-     *            Input new gene character
-     * @return the shiftout gene, in gene code format
-     */
-    public static byte moveKmerReverse(int k, byte[] kmer, byte c) {
-        int pos = ((k - 1) % 4) << 1;
-        byte output = (byte) ((kmer[0] >> pos) & 0x03);
-        for (int i = 0; i < kmer.length - 1; i++) {
-            byte in = (byte) ((kmer[i + 1] >> 6) & 0x03);
-            kmer[i] = (byte) ((kmer[i] << 2) | in);
-        }
-        // (k%4) * 2
-        if (k % 4 != 0) {
-            kmer[0] &= (1 << ((k % 4) << 1)) - 1;
-        }
-        kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
-        return (byte) (1 << output);
-    }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
deleted file mode 100644
index 7a96d2f..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-@Deprecated
-public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
-    private static final int LENGTH_BYTES = 4;
-    private static final byte[] EMPTY_BYTES = {};
-    private byte size;
-    private byte[] bytes;
-
-    public KmerBytesWritable() {
-        this(EMPTY_BYTES);
-    }
-
-    public KmerBytesWritable(byte[] bytes) {
-        this.bytes = bytes;
-        this.size = (byte) bytes.length;
-    }
-
-    @Override
-    public byte[] getBytes() {
-        return bytes;
-    }
-
-    @Deprecated
-    public byte[] get() {
-        return getBytes();
-    }
-
-    @Override
-    public int getLength() {
-        return (int) size;
-    }
-
-    @Deprecated
-    public int getSize() {
-        return getLength();
-    }
-
-    public void setSize(byte size) {
-        if ((int) size > getCapacity()) {
-            setCapacity((byte) (size * 3 / 2));
-        }
-        this.size = size;
-    }
-
-    public int getCapacity() {
-        return bytes.length;
-    }
-
-    public void setCapacity(byte new_cap) {
-        if (new_cap != getCapacity()) {
-            byte[] new_data = new byte[new_cap];
-            if (new_cap < size) {
-                size = new_cap;
-            }
-            if (size != 0) {
-                System.arraycopy(bytes, 0, new_data, 0, size);
-            }
-            bytes = new_data;
-        }
-    }
-
-    public void set(KmerBytesWritable newData) {
-        set(newData.bytes, (byte) 0, newData.size);
-    }
-
-    public void set(byte[] newData, byte offset, byte length) {
-        setSize((byte) 0);
-        setSize(length);
-        System.arraycopy(newData, offset, bytes, 0, size);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        setSize((byte) 0); // clear the old data
-        setSize(in.readByte());
-        in.readFully(bytes, 0, size);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeByte(size);
-        out.write(bytes, 0, size);
-    }
-
-    @Override
-    public int hashCode() {
-        return super.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object right_obj) {
-        if (right_obj instanceof KmerBytesWritable)
-            return super.equals(right_obj);
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        StringBuffer sb = new StringBuffer(3 * size);
-        for (int idx = 0; idx < (int) size; idx++) {
-            // if not the first, put a blank separator in
-            if (idx != 0) {
-                sb.append(' ');
-            }
-            String num = Integer.toHexString(0xff & bytes[idx]);
-            // if it is only one digit, add a leading 0.
-            if (num.length() < 2) {
-                sb.append('0');
-            }
-            sb.append(num);
-        }
-        return sb.toString();
-    }
-
-    public static class Comparator extends WritableComparator {
-        public Comparator() {
-            super(KmerBytesWritable.class);
-        }
-
-        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
-        }
-    }
-
-    static { // register this comparator
-        WritableComparator.define(KmerBytesWritable.class, new Comparator());
-    }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
deleted file mode 100644
index a4b1bec..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.util.Arrays;
-
-@Deprecated
-public class KmerUtil {
-
-    public static int countNumberOfBitSet(int i) {
-        int c = 0;
-        for (; i != 0; c++) {
-            i &= i - 1;
-        }
-        return c;
-    }
-
-    public static int inDegree(byte bitmap) {
-        return countNumberOfBitSet((bitmap >> 4) & 0x0f);
-    }
-
-    public static int outDegree(byte bitmap) {
-        return countNumberOfBitSet(bitmap & 0x0f);
-    }
-
-    /**
-     * Get last kmer from kmer-chain.
-     * e.g. kmerChain is AAGCTA, if k =5, it will
-     * return AGCTA
-     * 
-     * @param k
-     * @param kInChain
-     * @param kmerChain
-     * @return LastKmer bytes array
-     */
-    public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
-        if (k > kInChain) {
-            return null;
-        }
-        if (k == kInChain) {
-            return kmerChain.clone();
-        }
-        int byteNum = Kmer.getByteNumFromK(k);
-        byte[] kmer = new byte[byteNum];
-
-        /** from end to start */
-        int byteInChain = length - 1 - (kInChain - k) / 4;
-        int posInByteOfChain = ((kInChain - k) % 4) << 1; // *2
-        int byteInKmer = byteNum - 1;
-        for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
-            kmer[byteInKmer] = (byte) ((0xff & kmerChain[offset + byteInChain]) >> posInByteOfChain);
-            kmer[byteInKmer] |= ((kmerChain[offset + byteInChain - 1] << (8 - posInByteOfChain)));
-        }
-
-        /** last kmer byte */
-        if (byteInKmer == 0) {
-            kmer[0] = (byte) ((kmerChain[offset] & 0xff) >> posInByteOfChain);
-        }
-        return kmer;
-    }
-
-    /**
-     * Get first kmer from kmer-chain e.g. kmerChain is AAGCTA, if k=5, it will
-     * return AAGCT
-     * 
-     * @param k
-     * @param kInChain
-     * @param kmerChain
-     * @return FirstKmer bytes array
-     */
-    public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
-        if (k > kInChain) {
-            return null;
-        }
-        if (k == kInChain) {
-            return kmerChain.clone();
-        }
-        int byteNum = Kmer.getByteNumFromK(k);
-        byte[] kmer = new byte[byteNum];
-
-        int i = 1;
-        for (; i < kmer.length; i++) {
-            kmer[kmer.length - i] = kmerChain[offset + length - i];
-        }
-        int posInByteOfChain = (k % 4) << 1; // *2
-        if (posInByteOfChain == 0) {
-            kmer[0] = kmerChain[offset + length - i];
-        } else {
-            kmer[0] = (byte) (kmerChain[offset + length - i] & ((1 << posInByteOfChain) - 1));
-        }
-        return kmer;
-    }
-
-    /**
-     * Merge kmer with next neighbor in gene-code format.
-     * The k of new kmer will increase by 1
-     * e.g. AAGCT merge with A => AAGCTA
-     * 
-     * @param k
-     *            :input k of kmer
-     * @param kmer
-     *            : input bytes of kmer
-     * @param nextCode
-     *            : next neighbor in gene-code format
-     * @return the merged Kmer, this K of this Kmer is k+1
-     */
-    public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
-        int byteNum = length;
-        if (k % 4 == 0) {
-            byteNum++;
-        }
-        byte[] mergedKmer = new byte[byteNum];
-        for (int i = 1; i <= length; i++) {
-            mergedKmer[mergedKmer.length - i] = kmer[offset + length - i];
-        }
-        if (mergedKmer.length > length) {
-            mergedKmer[0] = (byte) (nextCode & 0x3);
-        } else {
-            mergedKmer[0] = (byte) (kmer[offset] | ((nextCode & 0x3) << ((k % 4) << 1)));
-        }
-        return mergedKmer;
-    }
-
-    /**
-     * Merge kmer with previous neighbor in gene-code format.
-     * The k of new kmer will increase by 1
-     * e.g. AAGCT merge with A => AAAGCT
-     * 
-     * @param k
-     *            :input k of kmer
-     * @param kmer
-     *            : input bytes of kmer
-     * @param preCode
-     *            : next neighbor in gene-code format
-     * @return the merged Kmer,this K of this Kmer is k+1
-     */
-    public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
-        int byteNum = length;
-        byte[] mergedKmer = null;
-        int byteInMergedKmer = 0;
-        if (k % 4 == 0) {
-            byteNum++;
-            mergedKmer = new byte[byteNum];
-            mergedKmer[0] = (byte) ((kmer[offset] >> 6) & 0x3);
-            byteInMergedKmer++;
-        } else {
-            mergedKmer = new byte[byteNum];
-        }
-        for (int i = 0; i < length - 1; i++, byteInMergedKmer++) {
-            mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + i] << 2) | ((kmer[offset + i + 1] >> 6) & 0x3));
-        }
-        mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + length - 1] << 2) | (preCode & 0x3));
-        return mergedKmer;
-    }
-
-    /**
-     * Merge two kmer to one kmer
-     * e.g. ACTA + ACCGT => ACTAACCGT
-     * 
-     * @param preK
-     *            : previous k of kmer
-     * @param kmerPre
-     *            : bytes array of previous kmer
-     * @param nextK
-     *            : next k of kmer
-     * @param kmerNext
-     *            : bytes array of next kmer
-     * @return merged kmer, the new k is @preK + @nextK
-     */
-    public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
-            byte[] kmerNext, int offsetNext, int lengthNext) {
-        int byteNum = Kmer.getByteNumFromK(preK + nextK);
-        byte[] mergedKmer = new byte[byteNum];
-        int i = 1;
-        for (; i <= lengthPre; i++) {
-            mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
-        }
-        if (i > 1) {
-            i--;
-        }
-        if (preK % 4 == 0) {
-            for (int j = 1; j <= lengthNext; j++) {
-                mergedKmer[byteNum - i - j] = kmerNext[offsetNext + lengthNext - j];
-            }
-        } else {
-            int posNeedToMove = ((preK % 4) << 1);
-            mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
-            for (int j = 1; j < lengthNext; j++) {
-                mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
-                        + lengthNext - j - 1] << posNeedToMove));
-            }
-            if ((nextK % 4) * 2 + posNeedToMove > 8) {
-                mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
-            }
-        }
-        return mergedKmer;
-    }
-
-    /**
-     * Safely shifted the kmer forward without change the input kmer
-     * e.g. AGCGC shift with T => GCGCT
-     * 
-     * @param k
-     *            : kmer length
-     * @param kmer
-     *            : input kmer
-     * @param afterCode
-     *            : input genecode
-     * @return new created kmer that shifted by afterCode, the K will not change
-     */
-    public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
-        Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
-        return shifted;
-    }
-
-    /**
-     * Safely shifted the kmer backward without change the input kmer
-     * e.g. AGCGC shift with T => TAGCG
-     * 
-     * @param k
-     *            : kmer length
-     * @param kmer
-     *            : input kmer
-     * @param preCode
-     *            : input genecode
-     * @return new created kmer that shifted by preCode, the K will not change
-     */
-    public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
-        byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
-        Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
-        return shifted;
-    }
-
-    public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
-        if (pos >= k) {
-            return -1;
-        }
-        int posByte = pos / 4;
-        int shift = (pos % 4) << 1;
-        return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
-    }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
deleted file mode 100644
index ebf1282..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.data.std.accessors;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
-
-    private ByteSerializerDeserializer() {
-    }
-
-    @Override
-    public Byte deserialize(DataInput in) throws HyracksDataException {
-        try {
-            return in.readByte();
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
-        try {
-            out.writeByte(instance.intValue());
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    public static byte getByte(byte[] bytes, int offset) {
-        return bytes[offset];
-    }
-
-    public static void putByte(byte val, byte[] bytes, int offset) {
-        bytes[offset] = val;
-    }
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
deleted file mode 100644
index 34c29c7..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.data.std.accessors;

-

-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-

-public class KmerBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

-    private static final long serialVersionUID = 1L;

-

-    @Override

-    public IBinaryHashFunction createBinaryHashFunction(final int seed) {

-

-        return new IBinaryHashFunction() {

-            private KmerPointable p = new KmerPointable();

-

-            @Override

-            public int hash(byte[] bytes, int offset, int length) {

-                if (length + offset >= bytes.length)

-                    throw new IllegalStateException("out of bound");

-                p.set(bytes, offset, length);

-                int hash = p.hash() * (seed + 1);

-                if (hash < 0) {

-                    hash = -(hash + 1);

-                }

-                return hash;

-            }

-        };

-    }

-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
deleted file mode 100644
index 8aaf380..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.data.std.accessors;

-

-import java.nio.ByteBuffer;

-

-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;

-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

-

-public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {

-

-    private static final long serialVersionUID = 1L;

-

-    public static int hashBytes(byte[] bytes, int offset, int length) {

-        int hash = 1;

-        for (int i = offset; i < offset + length; i++)

-            hash = (31 * hash) + (int) bytes[i];

-        return hash;

-    }

-

-    @Override

-    public ITuplePartitionComputer createPartitioner() {

-        return new ITuplePartitionComputer() {

-            @Override

-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {

-                int startOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);

-                int slotLength = accessor.getFieldSlotsLength();

-                int fieldLength = accessor.getFieldLength(tIndex, 0);

-

-                ByteBuffer buf = accessor.getBuffer();

-

-                int hash = hashBytes(buf.array(), startOffset + fieldOffset + slotLength, fieldLength);

-                if (hash < 0) {

-                    hash = -(hash + 1);

-                }

-

-                return hash % nParts;

-            }

-        };

-    }

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
deleted file mode 100644
index 83b9d12..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.data.std.accessors;

-

-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

-

-public class KmerNormarlizedComputerFactory implements INormalizedKeyComputerFactory {

-    private static final long serialVersionUID = 1L;

-

-    @Override

-    public INormalizedKeyComputer createNormalizedKeyComputer() {

-        return new INormalizedKeyComputer() {

-            /**

-             * read one int from Kmer, make sure this int is consistent whith Kmer compartor

-             */

-            @Override

-            public int normalize(byte[] bytes, int start, int length) {

-                return KmerPointable.getIntReverse(bytes, start, length);

-            }

-        };

-    }

-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
deleted file mode 100644
index 8febfa5..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.data.std.primitive;

-

-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;

-import edu.uci.ics.hyracks.data.std.api.AbstractPointable;

-import edu.uci.ics.hyracks.data.std.api.IComparable;

-import edu.uci.ics.hyracks.data.std.api.IHashable;

-import edu.uci.ics.hyracks.data.std.api.INumeric;

-import edu.uci.ics.hyracks.data.std.api.IPointable;

-import edu.uci.ics.hyracks.data.std.api.IPointableFactory;

-

-public final class KmerPointable extends AbstractPointable implements IHashable, IComparable, INumeric {

-    public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {

-        private static final long serialVersionUID = 1L;

-

-        @Override

-        public boolean isFixedLength() {

-            return false;

-        }

-

-        @Override

-        public int getFixedLength() {

-            return -1;

-        }

-    };

-

-    public static final IPointableFactory FACTORY = new IPointableFactory() {

-        private static final long serialVersionUID = 1L;

-

-        @Override

-        public IPointable createPointable() {

-            return new KmerPointable();

-        }

-

-        @Override

-        public ITypeTraits getTypeTraits() {

-            return TYPE_TRAITS;

-        }

-    };

-

-    public static short getShortReverse(byte[] bytes, int offset, int length) {

-        if (length < 2) {

-            return (short) (bytes[offset] & 0xff);

-        }

-        return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset + length - 2] & 0xff));

-    }

-

-    public static int getIntReverse(byte[] bytes, int offset, int length) {

-        int shortValue = getShortReverse(bytes, offset, length) & 0xffff;

-

-        if (length < 3) {

-            return shortValue;

-        }

-        if (length == 3) {

-            return (((bytes[offset + 2] & 0xff) << 16) + ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));

-        }

-        return ((bytes[offset + length - 1] & 0xff) << 24) + ((bytes[offset + length - 2] & 0xff) << 16)

-                + ((bytes[offset + length - 3] & 0xff) << 8) + ((bytes[offset + length - 4] & 0xff) << 0);

-    }

-

-    public static long getLongReverse(byte[] bytes, int offset, int length) {

-        if (length < 8) {

-            return ((long) getIntReverse(bytes, offset, length)) & 0x0ffffffffL;

-        }

-        return (((long) (bytes[offset + length - 1] & 0xff)) << 56)

-                + (((long) (bytes[offset + length - 2] & 0xff)) << 48)

-                + (((long) (bytes[offset + length - 3] & 0xff)) << 40)

-                + (((long) (bytes[offset + length - 4] & 0xff)) << 32)

-                + (((long) (bytes[offset + length - 5] & 0xff)) << 24)

-                + (((long) (bytes[offset + length - 6] & 0xff)) << 16)

-                + (((long) (bytes[offset + length - 7] & 0xff)) << 8) + (((long) (bytes[offset + length - 8] & 0xff)));

-    }

-

-    @Override

-    public int compareTo(IPointable pointer) {

-        return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());

-    }

-

-    @Override

-    public int compareTo(byte[] bytes, int offset, int length) {

-

-        if (this.length != length) {

-            return this.length - length;

-        }

-        for (int i = length - 1; i >= 0; i--) {

-            int cmp = (this.bytes[this.start + i] & 0xff) - (bytes[offset + i] & 0xff);

-            if (cmp != 0) {

-                return cmp;

-            }

-        }

-

-        return 0;

-    }

-

-    @Override

-    public int hash() {

-        int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);

-        return hash;

-    }

-

-    @Override

-    public byte byteValue() {

-        return bytes[start + length - 1];

-    }

-

-    @Override

-    public short shortValue() {

-        return getShortReverse(bytes, start, length);

-    }

-

-    @Override

-    public int intValue() {

-        return getIntReverse(bytes, start, length);

-    }

-

-    @Override

-    public long longValue() {

-        return getLongReverse(bytes, start, length);

-    }

-

-    @Override

-    public float floatValue() {

-        return Float.intBitsToFloat(intValue());

-    }

-

-    @Override

-    public double doubleValue() {

-        return Double.longBitsToDouble(longValue());

-    }

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
deleted file mode 100644
index ed1b926..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.dataflow;

-

-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;

-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;

-import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;

-import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;

-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

-

-/**

- * used by precluster groupby

- */

-public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {

-    private static final long serialVersionUID = 1L;

-    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

-    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

-

-    @Override

-    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,

-            int[] fanouts) {

-        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

-            return senderSideMaterializePolicy;

-        } else {

-            return pipeliningPolicy;

-        }

-    }

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
deleted file mode 100644
index 405e109..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-
-@SuppressWarnings("deprecation")
-public class KMerSequenceWriterFactory implements ITupleWriterFactory {
-
-    private static final long serialVersionUID = 1L;
-    private ConfFactory confFactory;
-    private final int kmerlength;
-
-    public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
-        this.confFactory = new ConfFactory(conf);
-        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-    }
-
-    public class TupleWriter implements ITupleWriter {
-        public TupleWriter(ConfFactory cf) {
-            this.cf = cf;
-        }
-
-        ConfFactory cf;
-        Writer writer = null;
-
-        KmerCountValue reEnterCount = new KmerCountValue();
-        KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
-
-        /**
-         * assumption is that output never change source!
-         */
-        @Override
-        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
-            try {
-                byte[] kmer = tuple.getFieldData(0);
-                int keyStart = tuple.getFieldStart(0);
-                int keyLength = tuple.getFieldLength(0);
-
-                byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
-                byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
-                reEnterCount.set(bitmap, count);
-                reEnterKey.set(kmer, keyStart, keyLength);
-                writer.append(reEnterKey, reEnterCount);
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        @Override
-        public void open(DataOutput output) throws HyracksDataException {
-            try {
-                writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
-                        KmerCountValue.class, CompressionType.NONE, null);
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        @Override
-        public void close(DataOutput output) throws HyracksDataException {
-            // TODO Auto-generated method stub
-        }
-    }
-
-    @Override
-    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
-        return new TupleWriter(confFactory);
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
deleted file mode 100644
index cfa7262..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class KMerTextWriterFactory implements ITupleWriterFactory {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-    private KmerBytesWritable kmer;
-
-    public KMerTextWriterFactory(int k) {
-        kmer = new KmerBytesWritable(k);
-    }
-
-    public class TupleWriter implements ITupleWriter {
-        @Override
-        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
-            try {
-                kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0), tuple.getFieldLength(0));
-                output.write(kmer.toString().getBytes());
-                output.writeByte('\t');
-                output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
-                output.writeByte('\t');
-                output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
-                output.writeByte('\n');
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        @Override
-        public void open(DataOutput output) throws HyracksDataException {
-            // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        public void close(DataOutput output) throws HyracksDataException {
-            // TODO Auto-generated method stub
-        }
-    }
-
-    @Override
-    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
-        // TODO Auto-generated method stub
-        return new TupleWriter();
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
deleted file mode 100644
index 409b434..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-;
-
-public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
-    private static final long serialVersionUID = 1L;
-
-    private KmerBytesWritable kmer;
-    private boolean bReversed;
-
-    public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
-        bReversed = bGenerateReversed;
-        kmer = new KmerBytesWritable(k);
-    }
-
-    @Override
-    public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
-        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
-        final ByteBuffer outputBuffer = ctx.allocateFrame();
-        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputAppender.reset(outputBuffer, true);
-
-        return new IKeyValueParser<LongWritable, Text>() {
-
-            @Override
-            public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
-                String geneLine = value.toString(); // Read the Real Gene Line
-                Pattern genePattern = Pattern.compile("[AGCT]+");
-                Matcher geneMatcher = genePattern.matcher(geneLine);
-                boolean isValid = geneMatcher.matches();
-                if (isValid) {
-                    SplitReads(geneLine.getBytes(), writer);
-                }
-            }
-
-            private void SplitReads(byte[] array, IFrameWriter writer) {
-                /** first kmer */
-                int k = kmer.getKmerLength();
-                if (k >= array.length){
-                	return;
-                }
-                kmer.setByRead(array, 0);
-                byte pre = 0;
-                byte next = GeneCode.getAdjBit(array[k]);
-                InsertToFrame(kmer, pre, next, writer);
-
-                /** middle kmer */
-                for (int i = k; i < array.length - 1; i++) {
-                    pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
-                    next = GeneCode.getAdjBit(array[i + 1]);
-                    InsertToFrame(kmer, pre, next, writer);
-                }
-
-                /** last kmer */
-                pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
-                next = 0;
-                InsertToFrame(kmer, pre, next, writer);
-
-                if (bReversed) {
-                    /** first kmer */
-                    kmer.setByReadReverse(array, 0);
-                    next = 0;
-                    pre = GeneCode.getAdjBit(array[k]);
-                    InsertToFrame(kmer, pre, next, writer);
-                    /** middle kmer */
-                    for (int i = k; i < array.length - 1; i++) {
-                        next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
-                        pre = GeneCode.getAdjBit(array[i + 1]);
-                        InsertToFrame(kmer, pre, next, writer);
-                    }
-                    /** last kmer */
-                    next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
-                    pre = 0;
-                    InsertToFrame(kmer, pre, next, writer);
-                }
-            }
-
-            /**
-             * At this graph building phase, we assume the kmer length are all
-             * the same Thus we didn't output those Kmer length
-             * 
-             * @param kmer
-             *            :input kmer
-             * @param pre
-             *            : pre neighbor code
-             * @param next
-             *            : next neighbor code
-             * @param writer
-             *            : output writer
-             */
-            private void InsertToFrame(KmerBytesWritable kmer, byte pre, byte next, IFrameWriter writer) {
-                try {
-                    byte adj = GeneCode.mergePreNextAdj(pre, next);
-                    tupleBuilder.reset();
-                    tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, adj);
-
-                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                            tupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outputBuffer, writer);
-                        outputAppender.reset(outputBuffer, true);
-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                                tupleBuilder.getSize())) {
-                            throw new IllegalStateException(
-                                    "Failed to copy an record into a frame: the record size is too large.");
-                        }
-                    }
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-
-            @Override
-            public void open(IFrameWriter writer) throws HyracksDataException {
-                // TODO Auto-generated method stub
-
-            }
-
-            @Override
-            public void close(IFrameWriter writer) throws HyracksDataException {
-                FrameUtils.flushFrame(outputBuffer, writer);
-            }
-        };
-    }
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
deleted file mode 100644
index ea70fb0..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.dataflow.aggregators;

-

-import java.io.DataOutput;

-import java.io.IOException;

-

-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;

-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

-

-/**

- * sum

- */

-public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {

-    private static final long serialVersionUID = 1L;

-

-    public DistributedMergeLmerAggregateFactory() {

-    }

-

-    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {

-        private static final int MAX = 127;

-

-        @Override

-        public void reset() {

-        }

-

-        @Override

-        public void close() {

-            // TODO Auto-generated method stub

-

-        }

-

-        @Override

-        public AggregateState createAggregateStates() {

-            return new AggregateState(new Object() {

-            });

-        }

-

-        protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {

-            int tupleOffset = accessor.getTupleStartOffset(tIndex);

-            int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

-            int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

-            byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

-            return data;

-        }

-

-        @Override

-        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-                throws HyracksDataException {

-            byte bitmap = getField(accessor, tIndex, 1);

-            byte count = getField(accessor, tIndex, 2);

-

-            DataOutput fieldOutput = tupleBuilder.getDataOutput();

-            try {

-                fieldOutput.writeByte(bitmap);

-                tupleBuilder.addFieldEndOffset();

-                fieldOutput.writeByte(count);

-                tupleBuilder.addFieldEndOffset();

-            } catch (IOException e) {

-                throw new HyracksDataException("I/O exception when initializing the aggregator.");

-            }

-        }

-

-        @Override

-        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

-                int stateTupleIndex, AggregateState state) throws HyracksDataException {

-            byte bitmap = getField(accessor, tIndex, 1);

-            short count = getField(accessor, tIndex, 2);

-

-            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

-            int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

-            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);

-            int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;

-            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;

-

-            byte[] data = stateAccessor.getBuffer().array();

-

-            bitmap |= data[bitoffset];

-            count += data[countoffset];

-            if (count >= MAX) {

-                count = (byte) MAX;

-            }

-            data[bitoffset] = bitmap;

-            data[countoffset] = (byte) count;

-        }

-

-        @Override

-        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                AggregateState state) throws HyracksDataException {

-            byte bitmap = getField(accessor, tIndex, 1);

-            byte count = getField(accessor, tIndex, 2);

-            DataOutput fieldOutput = tupleBuilder.getDataOutput();

-            try {

-                fieldOutput.writeByte(bitmap);

-                tupleBuilder.addFieldEndOffset();

-                fieldOutput.writeByte(count);

-                tupleBuilder.addFieldEndOffset();

-            } catch (IOException e) {

-                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

-            }

-

-        }

-

-        @Override

-        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                AggregateState state) throws HyracksDataException {

-            outputPartialResult(tupleBuilder, accessor, tIndex, state);

-        }

-

-    }

-

-    @Override

-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

-            throws HyracksDataException {

-        return new DistributeAggregatorDescriptor();

-    }

-

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
deleted file mode 100644
index 87c0207..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
-    private static final int MAX = 127;
-
-    @Override
-    public void reset() {
-    }
-
-    @Override
-    public void close() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public AggregateState createAggregateStates() {
-        return new AggregateState(new Object() {
-        });
-    }
-
-    protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
-        int tupleOffset = accessor.getTupleStartOffset(tIndex);
-        int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
-        int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
-        byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
-        return data;
-    }
-
-    @Override
-    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-            throws HyracksDataException {
-        byte bitmap = getField(accessor, tIndex, 1);
-        byte count = 1;
-
-        DataOutput fieldOutput = tupleBuilder.getDataOutput();
-        try {
-            fieldOutput.writeByte(bitmap);
-            tupleBuilder.addFieldEndOffset();
-            fieldOutput.writeByte(count);
-            tupleBuilder.addFieldEndOffset();
-        } catch (IOException e) {
-            throw new HyracksDataException("I/O exception when initializing the aggregator.");
-        }
-    }
-
-    @Override
-    public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-            int stateTupleIndex, AggregateState state) throws HyracksDataException {
-        byte bitmap = getField(accessor, tIndex, 1);
-        short count = 1;
-
-        int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
-        int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
-        int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
-        int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
-        int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
-        byte[] data = stateAccessor.getBuffer().array();
-
-        bitmap |= data[bitoffset];
-        count += data[countoffset];
-        if (count >= MAX) {
-            count = (byte) MAX;
-        }
-        data[bitoffset] = bitmap;
-        data[countoffset] = (byte) count;
-    }
-
-    @Override
-    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException {
-        byte bitmap = getField(accessor, tIndex, 1);
-        byte count = getField(accessor, tIndex, 2);
-        DataOutput fieldOutput = tupleBuilder.getDataOutput();
-        try {
-            fieldOutput.writeByte(bitmap);
-            tupleBuilder.addFieldEndOffset();
-            fieldOutput.writeByte(count);
-            tupleBuilder.addFieldEndOffset();
-        } catch (IOException e) {
-            throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
-        }
-
-    }
-
-    @Override
-    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException {
-        outputPartialResult(tupleBuilder, accessor, tIndex, state);
-    }
-
-};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
deleted file mode 100644
index b5eb70f..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*

- * Copyright 2009-2012 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package edu.uci.ics.genomix.dataflow.aggregators;

-

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

-

-/**

- * count

- */

-public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

-    private static final long serialVersionUID = 1L;

-

-    public MergeKmerAggregateFactory() {

-    }

-

-    @Override

-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

-            throws HyracksDataException {

-        return new LocalAggregatorDescriptor();

-    }

-

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
deleted file mode 100644
index 27d5e15..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.driver;
-
-import java.net.URL;
-import java.util.EnumSet;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.job.JobGen;
-import edu.uci.ics.genomix.job.JobGenBrujinGraph;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class Driver {
-    public static enum Plan {
-        BUILD_DEBRUJIN_GRAPH,
-        GRAPH_CLEANNING,
-        CONTIGS_GENERATION,
-    }
-
-    private static final String IS_PROFILING = "genomix.driver.profiling";
-    private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
-    private static final Log LOG = LogFactory.getLog(Driver.class);
-    private JobGen jobGen;
-    private boolean profiling;
-
-    private int numPartitionPerMachine;
-
-    private IHyracksClientConnection hcc;
-    private Scheduler scheduler;
-
-    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
-        try {
-            hcc = new HyracksConnection(ipAddress, port);
-            scheduler = new Scheduler(hcc.getNodeControllerInfos());
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-        this.numPartitionPerMachine = numPartitionPerMachine;
-    }
-
-    public void runJob(GenomixJob job) throws HyracksException {
-        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
-    }
-
-    public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
-        /** add hadoop configurations */
-        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
-        job.addResource(hadoopCore);
-        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
-        job.addResource(hadoopMapRed);
-        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
-        job.addResource(hadoopHdfs);
-
-        LOG.info("job started");
-        long start = System.currentTimeMillis();
-        long end = start;
-        long time = 0;
-
-        this.profiling = profiling;
-        try {
-            Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
-            LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
-            switch (planChoice) {
-                case BUILD_DEBRUJIN_GRAPH:
-                default:
-                    jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
-                    break;
-            }
-
-            start = System.currentTimeMillis();
-            runCreate(jobGen);
-            end = System.currentTimeMillis();
-            time = end - start;
-            LOG.info("result writing finished " + time + "ms");
-            LOG.info("job finished");
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    private void runCreate(JobGen jobGen) throws Exception {
-        try {
-            JobSpecification createJob = jobGen.generateJob();
-            execute(createJob);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    private void execute(JobSpecification job) throws Exception {
-        job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc
-                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
-    }
-
-    public static void main(String[] args) throws Exception {
-        GenomixJob jobConf = new GenomixJob();
-        String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
-        if (otherArgs.length < 4) {
-            System.err.println("Need <serverIP> <port> <input> <output>");
-            System.exit(-1);
-        }
-        String ipAddress = otherArgs[0];
-        int port = Integer.parseInt(otherArgs[1]);
-        int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
-        boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
-        // FileInputFormat.setInputPaths(job, otherArgs[2]);
-        {
-            Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
-            jobConf.set("mapred.input.dir", path.toString());
-
-            Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
-            jobConf.set("mapred.output.dir", outputDir.toString());
-        }
-        // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
-        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
-        Driver driver = new Driver(ipAddress, port, numOfDuplicate);
-        driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
-    }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
new file mode 100644
index 0000000..4d00731
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDNormarlizedComputeFactory implements INormalizedKeyComputerFactory{
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer(){
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                return IntegerSerializerDeserializer.getInt(bytes, start);
+            }
+            
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
new file mode 100644
index 0000000..44b8f55
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeSequenceWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public NodeSequenceWriterFactory(JobConf job) {
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
new file mode 100644
index 0000000..21098a9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index 2ed4c9b..cb9db16 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -9,7 +9,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class AggerateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
 
     /**
      * 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index b83aaf7..a20b60d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -1,5 +1,75 @@
 package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
 
-public class MergeReadIDAggregateFactory {
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new IAggregatorDescriptor(){
+
+            @Override
+            public AggregateState createAggregateStates() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+                
+            }
+            
+        };
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 6eb5bac..4357d7a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -22,31 +22,39 @@
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
-import edu.uci.ics.genomix.hyracks.data.accessors.KmerBinaryHashFunctionFamily;
 import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
 import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDNormarlizedComputeFactory;
 import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
 import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
 import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.hyracks.dataflow.KMerSequenceWriterFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.KMerTextWriterFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.NodeTextWriterFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -55,7 +63,6 @@
 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
@@ -79,7 +86,7 @@
     private Scheduler scheduler;
     private String[] ncNodeNames;
 
-    private int kmers;
+    private int kmerSize;
     private int frameLimits;
     private int frameSize;
     private int tableSize;
@@ -87,17 +94,14 @@
     private OutputFormat outputFormat;
     private boolean bGenerateReversedKmer;
 
-    private AbstractOperatorDescriptor singleGrouper;
-    private IConnectorDescriptor connPartition;
-    private AbstractOperatorDescriptor crossGrouper;
-    private RecordDescriptor readOutputRec;
-    private RecordDescriptor combineOutputRec;
-
     /** works for hybrid hashing */
     private long inputSizeInRawRecords;
     private long inputSizeInUniqueKeys;
     private int recordSizeInBytes;
     private int hashfuncStartLevel;
+    private ExternalGroupOperatorDescriptor readLocalAggregator;
+    private MToNPartitioningConnectorDescriptor readConnPartition;
+    private ExternalGroupOperatorDescriptor readCrossAggregator;
 
     private void logDebug(String status) {
         LOG.debug(status + " nc nodes:" + ncNodeNames.length);
@@ -117,154 +121,154 @@
     }
 
     private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggeragater) {
+            IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor outRed) {
         return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(keyFields,
-                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                        .of(KmerPointable.FACTORY) }), tableSize), true);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+                aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+                        tableSize), true);
     }
 
-    private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
-            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
-            IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
-        return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
-                inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
-                new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec, true);
-    }
-
-    private void generateKmerAggeragateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
+            IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor outRed) throws HyracksDataException {
         int[] keyFields = new int[] { 0 }; // the id of grouped key
+        Object[] obj = new Object[3];
 
         switch (groupbyType) {
             case EXTERNAL:
-                singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
-                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-                crossGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+                        outRed);
+                obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+                obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+                        outRed);
                 break;
             case PRECLUSTER:
             default:
-                singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
-                connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
-                        new KmerHashPartitioncomputerFactory(), keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
-                crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                        new MergeKmerAggregateFactory(), combineOutputRec);
-                break;
-            case HYBRIDHASH:
-                singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
-                        recordSizeInBytes, hashfuncStartLevel, new AggregateKmerAggregateFactory());
-                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-
-                crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
-                        recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+                        outRed);
+                obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+                obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+                        outRed);
                 break;
         }
+        return obj;
     }
 
-    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec, RecordDescriptor outRec)
+            throws HyracksDataException {
         try {
 
             InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
 
             LOG.info("HDFS read into " + splits.length + " splits");
             String[] readSchedule = scheduler.getLocationConstraints(splits);
-            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
-                    new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+            return new HDFSReadOperatorDescriptor(jobSpec, outRec, job, splits, readSchedule,
+                    new ReadsKeyValueParserFactory(kmerSize, bGenerateReversedKmer));
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
-    private AbstractOperatorDescriptor newGroupByReadOperator(JobSpecification jobSpec, RecordDescriptor nodeOutputRec) {
-        // TODO Auto-generated method stub
-        return null;
+    private void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+        jobSpec.connect(conn, preOp, 0, nextOp, 0);
     }
-    
+
     @Override
     public JobSpecification generateJob() throws HyracksException {
 
         JobSpecification jobSpec = new JobSpecification();
-        readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
-                null, null});
-        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null });
+        RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
         jobSpec.setFrameSize(frameSize);
 
         // File input
-        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
         logDebug("ReadKmer Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec, readKmerOutputRec);
 
-        generateKmerAggeragateDescriptorbyType(jobSpec);
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
+                new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec);
+        AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
         logDebug("LocalKmerGroupby Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
-
-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+        connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
 
         logDebug("CrossKmerGroupby Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
-        jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+        IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
 
         logDebug("Map Kmer to Read Operator");
-        //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,Kmer,{OtherReadID,...}) 
-        RecordDescriptor mapToReadOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                null, null,null, null });
-        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, mapToReadOutputRec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, mapKmerToRead, ncNodeNames);
-        IConnectorDescriptor mapReadConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(mapReadConn, crossGrouper, 0, mapKmerToRead, 0);
+        //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer) 
+        RecordDescriptor readIDOutputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { null, null, null, null });
+        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
 
         logDebug("Group by Read Operator");
-        // (ReadID,PosInRead,Kmer,{OtherReadID,...})
-        RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                null, null, null, null });
-        AbstractOperatorDescriptor groupbyReadOperator = newGroupByReadOperator(jobSpec,nodeOutputRec);
-        IConnectorDescriptor readPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new ReadIDPartitionComputerFactory());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, groupbyReadOperator, ncNodeNames);
-        jobSpec.connect(readPartition, mapKmerToRead, 0, groupbyReadOperator, 0);
+        // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} 
+        RecordDescriptor nodeCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+                new MergeReadIDAggregateFactory(), new ReadIDPartitionComputerFactory(),
+                new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, nodeCombineRec);
+        AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
 
-        // Output
-        ITupleWriterFactory writer = null;
+        IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+
+        // Output Kmer
+        ITupleWriterFactory kmerWriter = null;
+        ITupleWriterFactory nodeWriter = null;
         switch (outputFormat) {
             case TEXT:
-                writer = new KMerTextWriterFactory(kmers);
+                kmerWriter = new KMerTextWriterFactory(kmerSize);
+                nodeWriter = new NodeTextWriterFactory();
                 break;
             case BINARY:
             default:
-                writer = new KMerSequenceWriterFactory(job);
+                kmerWriter = new KMerSequenceWriterFactory(job);
+                nodeWriter = new NodeSequenceWriterFactory(job);
                 break;
         }
-        HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
-
         logDebug("WriteOperator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, kmerWriter);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
 
-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(printConn, groupbyReadOperator, 0, writeOperator, 0);
-        jobSpec.addRoot(writeOperator);
-
+        // Output Node
+        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeNodeOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeNodeOperator);
+        
         if (groupbyType == GroupbyType.PRECLUSTER) {
             jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         }
         return jobSpec;
     }
 
- 
-
     @Override
     protected void initJobConfiguration() {
 
-        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-        if (kmers % 2 == 0) {
-            kmers--;
-            conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+        kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        if (kmerSize % 2 == 0) {
+            kmerSize--;
+            conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
         }
         frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
         tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
@@ -300,7 +304,7 @@
         }
         job = new JobConf(conf);
         LOG.info("Genomix Graph Build Configuration");
-        LOG.info("Kmer:" + kmers);
+        LOG.info("Kmer:" + kmerSize);
         LOG.info("Groupby type:" + type);
         LOG.info("Output format:" + output);
         LOG.info("Frame limit" + frameLimits);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
deleted file mode 100644
index be82477..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-
-public class GenomixJob extends JobConf {
-
-    public static final String JOB_NAME = "genomix";
-
-    /** Kmers length */
-    public static final String KMER_LENGTH = "genomix.kmer";
-    /** Frame Size */
-    public static final String FRAME_SIZE = "genomix.framesize";
-    /** Frame Limit, hyracks need */
-    public static final String FRAME_LIMIT = "genomix.framelimit";
-    /** Table Size, hyracks need */
-    public static final String TABLE_SIZE = "genomix.tablesize";
-    /** Groupby types */
-    public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
-    /** Graph outputformat */
-    public static final String OUTPUT_FORMAT = "genomix.graph.output";
-    /** Get reversed Kmer Sequence */
-    public static final String REVERSED_KMER = "genomix.kmer.reversed";
-
-    /** Configurations used by hybrid groupby function in graph build phrase */
-    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
-    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
-    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
-    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
-    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
-
-    public static final int DEFAULT_KMER = 21;
-    public static final int DEFAULT_FRAME_SIZE = 32768;
-    public static final int DEFAULT_FRAME_LIMIT = 4096;
-    public static final int DEFAULT_TABLE_SIZE = 10485767;
-    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
-    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
-    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
-    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
-    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
-
-    public static final boolean DEFAULT_REVERSED = false;
-
-    public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
-    public static final String DEFAULT_OUTPUT_FORMAT = "binary";
-
-    public GenomixJob() throws IOException {
-        super(new Configuration());
-    }
-
-    public GenomixJob(Configuration conf) throws IOException {
-        super(conf);
-    }
-
-    /**
-     * Set the kmer length
-     * 
-     * @param the
-     *            desired frame size
-     */
-    final public void setKmerLength(int kmerlength) {
-        setInt(KMER_LENGTH, kmerlength);
-    }
-
-    final public void setFrameSize(int frameSize) {
-        setInt(FRAME_SIZE, frameSize);
-    }
-
-    final public void setFrameLimit(int frameLimit) {
-        setInt(FRAME_LIMIT, frameLimit);
-    }
-
-    final public void setTableSize(int tableSize) {
-        setInt(TABLE_SIZE, tableSize);
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
deleted file mode 100644
index b1f9a29..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.util.UUID;
-
-import org.apache.hadoop.conf.Configuration;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public abstract class JobGen {
-
-    protected final Configuration conf;
-    protected final GenomixJob genomixJob;
-    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
-
-    public JobGen(GenomixJob job) {
-        this.conf = job;
-        this.genomixJob = job;
-        this.initJobConfiguration();
-    }
-
-    protected abstract void initJobConfiguration();
-
-    public abstract JobSpecification generateJob() throws HyracksException;
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
deleted file mode 100644
index 79b38e8..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.genomix.data.std.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
-import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
-import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenBrujinGraph extends JobGen {
-    public enum GroupbyType {
-        EXTERNAL,
-        PRECLUSTER,
-        HYBRIDHASH,
-    }
-
-    public enum OutputFormat {
-        TEXT,
-        BINARY,
-    }
-
-    JobConf job;
-    private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-    private Scheduler scheduler;
-    private String[] ncNodeNames;
-
-    private int kmers;
-    private int frameLimits;
-    private int frameSize;
-    private int tableSize;
-    private GroupbyType groupbyType;
-    private OutputFormat outputFormat;
-    private boolean bGenerateReversedKmer;
-
-    private AbstractOperatorDescriptor singleGrouper;
-    private IConnectorDescriptor connPartition;
-    private AbstractOperatorDescriptor crossGrouper;
-    private RecordDescriptor readOutputRec;
-    private RecordDescriptor combineOutputRec;
-
-    /** works for hybrid hashing */
-    private long inputSizeInRawRecords;
-    private long inputSizeInUniqueKeys;
-    private int recordSizeInBytes;
-    private int hashfuncStartLevel;
-
-    private void logDebug(String status) {
-        String names = "";
-        for (String str : ncNodeNames) {
-            names += str + " ";
-        }
-        LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
-    }
-
-    public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
-            int numPartitionPerMachine) {
-        super(job);
-        this.scheduler = scheduler;
-        String[] nodes = new String[ncMap.size()];
-        ncMap.keySet().toArray(nodes);
-        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
-        for (int i = 0; i < numPartitionPerMachine; i++) {
-            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
-        }
-        logDebug("initialize");
-    }
-
-    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggeragater) {
-        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
-                combineOutputRec, new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(keyFields,
-                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                        .of(KmerPointable.FACTORY) }), tableSize), true);
-    }
-
-    private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
-            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
-            IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
-        return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
-                inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
-                new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
-                combineOutputRec, true);
-    }
-
-    private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
-        int[] keyFields = new int[] { 0 }; // the id of grouped key
-
-        switch (groupbyType) {
-            case EXTERNAL:
-                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
-                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-                crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
-                break;
-            case PRECLUSTER:
-                singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
-                connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
-                        new KmerHashPartitioncomputerFactory(), keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
-                crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
-                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                        new DistributedMergeLmerAggregateFactory(), combineOutputRec);
-                break;
-            case HYBRIDHASH:
-            default:
-                singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
-                        recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
-                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-
-                crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
-                        recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
-                break;
-        }
-    }
-
-    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
-        try {
-
-            InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
-
-            LOG.info("HDFS read into " + splits.length + " splits");
-            String[] readSchedule = scheduler.getLocationConstraints(splits);
-            String log = "";
-            for (String schedule : readSchedule) {
-                log += schedule + " ";
-            }
-            LOG.info("HDFS read schedule " + log);
-            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
-                    new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public JobSpecification generateJob() throws HyracksException {
-
-        JobSpecification jobSpec = new JobSpecification();
-        readOutputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
-        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
-                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
-        jobSpec.setFrameSize(frameSize);
-
-        // File input
-        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
-        logDebug("Read Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
-        generateDescriptorbyType(jobSpec);
-        logDebug("SingleGroupby Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
-
-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
-
-        logDebug("CrossGrouper Operator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
-        jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
-
-        // Output
-        ITupleWriterFactory writer = null;
-        switch (outputFormat) {
-            case TEXT:
-                writer = new KMerTextWriterFactory(kmers);
-                break;
-            case BINARY:
-            default:
-                writer = new KMerSequenceWriterFactory(job);
-                break;
-        }
-        HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
-
-        logDebug("WriteOperator");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
-
-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
-        jobSpec.addRoot(writeOperator);
-
-        if (groupbyType == GroupbyType.PRECLUSTER) {
-            jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        }
-        return jobSpec;
-    }
-
-    @Override
-    protected void initJobConfiguration() {
-
-        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-        if (kmers % 2 == 0) {
-            kmers--;
-            conf.setInt(GenomixJob.KMER_LENGTH, kmers);
-        }
-        frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
-        tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
-        frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
-        inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
-        inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
-        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
-        hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
-        /** here read the different recordSize why ? */
-        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
-
-        bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
-
-        String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
-        if (type.equalsIgnoreCase("external")) {
-            groupbyType = GroupbyType.EXTERNAL;
-        } else if (type.equalsIgnoreCase("precluster")) {
-            groupbyType = GroupbyType.PRECLUSTER;
-        } else {
-            groupbyType = GroupbyType.HYBRIDHASH;
-        }
-
-        String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
-        if (output.equalsIgnoreCase("text")) {
-            outputFormat = OutputFormat.TEXT;
-        } else {
-            outputFormat = OutputFormat.BINARY;
-        }
-        job = new JobConf(conf);
-        LOG.info("Genomix Graph Build Configuration");
-        LOG.info("Kmer:" + kmers);
-        LOG.info("Groupby type:" + type);
-        LOG.info("Output format:" + output);
-        LOG.info("Frame limit" + frameLimits);
-        LOG.info("Frame size" + frameSize);
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
deleted file mode 100644
index a88fa79..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.util.ByteComparatorFactory;
-import edu.uci.ics.genomix.util.StatCountAggregateFactory;
-import edu.uci.ics.genomix.util.StatReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.util.StatSumAggregateFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenStatistic extends JobGen {
-    private int kmers;
-    private JobConf hadoopjob;
-    private RecordDescriptor readOutputRec;
-    private String[] ncNodeNames;
-    private Scheduler scheduler;
-    private RecordDescriptor combineOutputRec;
-
-    public JobGenStatistic(GenomixJob job) {
-        super(job);
-        // TODO Auto-generated constructor stub
-    }
-
-    @Override
-    protected void initJobConfiguration() {
-        // TODO Auto-generated method stub
-        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
-        hadoopjob = new JobConf(conf);
-        hadoopjob.setInputFormat(SequenceFileInputFormat.class);
-    }
-
-    @Override
-    public JobSpecification generateJob() throws HyracksException {
-        int[] degreeFields = { 0, 1 }; // indegree, outdegree
-        int[] countFields = { 2 };
-        JobSpecification jobSpec = new JobSpecification();
-        /** specify the record fields after read */
-        readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
-                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
-        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
-                ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-        /** the reader */
-        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
-        /** the combiner aggregator */
-        AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
-        AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
-
-        /** the final aggregator */
-        AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
-        AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
-
-        /** writer */
-        AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
-        AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
-        jobSpec.addRoot(writeDegree);
-        jobSpec.addRoot(writeCount);
-        return null;
-    }
-
-    private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
-        try {
-
-            InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
-
-            String[] readSchedule = scheduler.getLocationConstraints(splits);
-            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
-                    new StatReadsKeyValueParserFactory());
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggeragater) {
-        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
-                new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
-                aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                                new ByteComparatorFactory(), new ByteComparatorFactory() }),
-                        GenomixJob.DEFAULT_TABLE_SIZE), true);
-    }
-
-    private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
-            HDFSReadOperatorDescriptor readOperator) {
-        AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
-                new StatCountAggregateFactory());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
-        return localAggregator;
-    }
-
-    private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
-            AbstractOperatorDescriptor localAggregator) {
-        AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
-        // only need one reducer
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
-                % ncNodeNames.length]);
-        IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
-                new ITuplePartitionComputerFactory() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public ITuplePartitionComputer createPartitioner() {
-                        return new ITuplePartitionComputer() {
-                            @Override
-                            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
-                                    throws HyracksDataException {
-                                return 0;
-                            }
-                        };
-                    }
-                }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
-        jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
-        return finalAggregator;
-    }
-
-    private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
-            AbstractOperatorDescriptor finalAggregator) {
-        LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
-                % ncNodeNames.length]);
-
-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
-        return writeOperator;
-    }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
deleted file mode 100644
index 832966b..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-
-public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IBinaryComparator createBinaryComparator() {
-        return new IBinaryComparator() {
-
-            @Override
-            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-                return b1[s1] - b2[s2];
-            }
-
-        };
-    }
-
-    @Override
-    public IBinaryHashFunction createBinaryHashFunction() {
-        return new IBinaryHashFunction() {
-
-            @Override
-            public int hash(byte[] bytes, int offset, int length) {
-                return bytes[offset];
-            }
-
-        };
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
deleted file mode 100644
index 65303a8..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    public class CountAggregator implements IAggregatorDescriptor {
-        private final int[] keyFields;
-
-        public CountAggregator(int[] keyFields) {
-            this.keyFields = keyFields;
-        }
-
-        @Override
-        public AggregateState createAggregateStates() {
-            // TODO Auto-generated method stub
-            return null;
-        }
-
-        @Override
-        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-                throws HyracksDataException {
-            int count = 1;
-            DataOutput fieldOutput = tupleBuilder.getDataOutput();
-            try {
-                fieldOutput.writeInt(count);
-                tupleBuilder.addFieldEndOffset();
-            } catch (IOException e) {
-                throw new HyracksDataException("I/O exception when initializing the aggregator.");
-            }
-        }
-
-        @Override
-        public void reset() {
-            // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-                int stateTupleIndex, AggregateState state) throws HyracksDataException {
-            int count = 1;
-
-            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
-            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
-            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
-            byte[] data = stateAccessor.getBuffer().array();
-            count += IntegerSerializerDeserializer.getInt(data, countoffset);
-            IntegerSerializerDeserializer.putInt(count, data, countoffset);
-        }
-
-        @Override
-        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                AggregateState state) throws HyracksDataException {
-            int count = getCount(accessor, tIndex);
-            DataOutput fieldOutput = tupleBuilder.getDataOutput();
-            try {
-                fieldOutput.writeInt(count);
-                tupleBuilder.addFieldEndOffset();
-            } catch (IOException e) {
-                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
-            }
-
-        }
-
-        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
-            int tupleOffset = accessor.getTupleStartOffset(tIndex);
-            int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
-            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
-            byte[] data = accessor.getBuffer().array();
-
-            return IntegerSerializerDeserializer.getInt(data, countoffset);
-        }
-
-        @Override
-        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                AggregateState state) throws HyracksDataException {
-            outputPartialResult(tupleBuilder, accessor, tIndex, state);
-        }
-
-        @Override
-        public void close() {
-            // TODO Auto-generated method stub
-
-        }
-
-    }
-
-    @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
-            throws HyracksDataException {
-        // TODO Auto-generated method stub
-        return new CountAggregator(keyFields);
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
deleted file mode 100644
index 2fcca67..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
-            throws HyracksDataException {
-
-        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
-        final ByteBuffer outputBuffer = ctx.allocateFrame();
-        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputAppender.reset(outputBuffer, true);
-
-        return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
-
-            @Override
-            public void open(IFrameWriter writer) throws HyracksDataException {
-                // TODO Auto-generated method stub
-
-            }
-
-            @Override
-            public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
-                    throws HyracksDataException {
-                byte adjMap = value.getAdjBitMap();
-                byte count = value.getCount();
-                InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
-            }
-
-            @Override
-            public void close(IFrameWriter writer) throws HyracksDataException {
-                FrameUtils.flushFrame(outputBuffer, writer);
-            }
-
-            private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
-                try {
-                    tupleBuilder.reset();
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
-
-                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                            tupleBuilder.getSize())) {
-                        FrameUtils.flushFrame(outputBuffer, writer);
-                        outputAppender.reset(outputBuffer, true);
-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                                tupleBuilder.getSize())) {
-                            throw new IllegalStateException(
-                                    "Failed to copy an record into a frame: the record size is too large.");
-                        }
-                    }
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-        };
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
deleted file mode 100644
index 39ac60a..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 1L;
-
-    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
-
-        private final int[] keyFields;
-
-        public DistributeAggregatorDescriptor(int[] keyFields) {
-            this.keyFields = keyFields;
-        }
-
-        @Override
-        public AggregateState createAggregateStates() {
-            // TODO Auto-generated method stub
-            return null;
-        }
-
-        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
-            int tupleOffset = accessor.getTupleStartOffset(tIndex);
-            int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
-            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
-            byte[] data = accessor.getBuffer().array();
-            return IntegerSerializerDeserializer.getInt(data, countoffset);
-        }
-
-        @Override
-        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-                throws HyracksDataException {
-            int count = getCount(accessor, tIndex);
-
-            DataOutput fieldOutput = tupleBuilder.getDataOutput();
-            try {
-                fieldOutput.writeInt(count);
-                tupleBuilder.addFieldEndOffset();
-            } catch (IOException e) {
-                throw new HyracksDataException("I/O exception when initializing the aggregator.");
-            }
-        }
-
-        @Override
-        public void reset() {
-            // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
-                int stateTupleIndex, AggregateState state) throws HyracksDataException {
-            int count = getCount(accessor, tIndex);
-
-            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
-            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
-            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
-            byte[] data = stateAccessor.getBuffer().array();
-            count += IntegerSerializerDeserializer.getInt(data, countoffset);
-            IntegerSerializerDeserializer.putInt(count, data, countoffset);
-        }
-
-        @Override
-        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                AggregateState state) throws HyracksDataException {
-            int count = getCount(accessor, tIndex);
-            DataOutput fieldOutput = tupleBuilder.getDataOutput();
-            try {
-                fieldOutput.writeInt(count);
-                tupleBuilder.addFieldEndOffset();
-            } catch (IOException e) {
-                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
-            }
-
-        }
-
-        @Override
-        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                AggregateState state) throws HyracksDataException {
-            outputPartialResult(tupleBuilder, accessor, tIndex, state);
-
-        }
-
-        @Override
-        public void close() {
-            // TODO Auto-generated method stub
-
-        }
-
-    }
-
-    @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
-            throws HyracksDataException {
-        // TODO Auto-generated method stub
-        return new DistributeAggregatorDescriptor(keyFields);
-    }
-
-}