cleanning continue
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
new file mode 100644
index 0000000..714d77e
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -0,0 +1,192 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+/*
+ * A base class providing most of the boilerplate for Genomix-based tests
+ */
+@SuppressWarnings("deprecation")
+public class GenomixMiniClusterTest {
+    protected int KMER_LENGTH = 5;
+    protected int READ_LENGTH = 8;
+
+    // subclass should modify this to include the HDFS directories that should be cleaned up
+    protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+    protected static String EXPECTED_ROOT = "src/test/resources/expected/";
+    protected static String ACTUAL_ROOT = "src/test/resources/actual/";
+
+    protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+
+    protected static MiniDFSCluster dfsCluster;
+    protected static MiniMRCluster mrCluster;
+    private static FileSystem dfs;
+    protected static JobConf conf = new JobConf();
+    protected static int numberOfNC = 1;
+    protected static int numPartitionPerMachine = 1;
+    protected static Driver driver;
+
+    @BeforeClass
+    public static void setUpMiniCluster() throws Exception {
+        cleanupStores();
+        startHDFS();
+        HyracksUtils.init();
+        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+        driver = new Driver(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+    }
+
+    /*
+     * Merge and copy a DFS directory to a local destination, converting to text if necessary. 
+     * Also locally store the binary-formatted result if available.
+     */
+    protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf) throws IOException {
+        if (resultsAreText) {
+            // for text files, just concatenate them together
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+                    new Path(localDestFile), false, conf, null);
+        } else {
+            // file is binary
+//            // save the entire binary output dir
+//            FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+//                    new Path(localDestFile + ".bindir"), false, conf);
+
+            // also load the Nodes and write them out as text locally. 
+            FileSystem lfs = FileSystem.getLocal(new Configuration());
+            lfs.mkdirs(new Path(localDestFile).getParent());
+            File filePathTo = new File(localDestFile);
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+
+            FileStatus[] files = dfs.globStatus(new Path(hdfsSrcDir + "*"));
+            SequenceFile.Reader reader = new SequenceFile.Reader(dfs, files[0].getPath(), conf);
+            String destBinDir = localDestFile.substring(0, localDestFile.lastIndexOf("."));
+//            FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+//                    new Path(destBinDir), false, conf);
+            SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+                    + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+            for (FileStatus f : files) {
+                if (f.getLen() == 0) {
+                    continue;
+                }
+                reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                    writer.append(key, value);
+
+                }
+                reader.close();
+            }
+            writer.close();
+            bw.close();
+        }
+
+    }
+
+    protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+        File dumped = new File(actualPath);
+        if (poslistField != null) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
+
+    protected static void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    protected static void startHDFS() throws IOException {
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+        //        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        dfs = dfsCluster.getFileSystem();
+        mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+        System.out.println(dfs.getUri().toString());
+
+        DataOutputStream confOutput = new DataOutputStream(
+                new FileOutputStream(new File(HADOOP_CONF_ROOT + "conf.xml")));
+        conf.writeXml(confOutput);
+        confOutput.close();
+    }
+
+    protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+        Path dest = new Path(hdfsDest);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(new Path(localSrc), dest);
+    }
+
+    /*
+     * Remove the local "actual" folder and any hdfs folders in use by this test
+     */
+    public void cleanUpOutput() throws IOException {
+//        // local cleanup
+//        FileSystem lfs = FileSystem.getLocal(new Configuration());
+//        if (lfs.exists(new Path(ACTUAL_ROOT))) {
+//            lfs.delete(new Path(ACTUAL_ROOT), true);
+//        }
+        // dfs cleanup
+        for (String path : HDFS_PATHS) {
+            if (dfs.exists(new Path(path))) {
+                dfs.delete(new Path(path), true);
+            }
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        HyracksUtils.deinit();
+        cleanupHDFS();
+    }
+
+    protected static void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+        mrCluster.shutdown();
+    }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
new file mode 100644
index 0000000..be53c5e
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -0,0 +1,248 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+//import edu.uci.ics.genomix.hadoop.velvetgraphbuilding.GraphBuildingDriver;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+
+/*
+ * A base class providing most of the boilerplate for Hadoop-based tests
+ */
+@SuppressWarnings("deprecation")
+public class HadoopMiniClusterTest {
+    protected int KMER_LENGTH = 5;
+    protected int READ_LENGTH = 8;
+
+    // subclass should modify this to include the HDFS directories that should be cleaned up
+    protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+    protected static final String EXPECTED_ROOT = "src/test/resources/expected/";
+    protected static final String ACTUAL_ROOT = "src/test/resources/actual/";
+    protected static final String INPUT_ROOT = "src/test/resources/input/";
+
+    protected static String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+    protected static String HADOOP_CONF = HADOOP_CONF_ROOT + "conf.xml";
+
+    protected static MiniDFSCluster dfsCluster;
+    protected static MiniMRCluster mrCluster;
+    protected static FileSystem dfs;
+    protected static JobConf conf = new JobConf();
+    protected static int numberOfNC = 1;
+    protected static int numPartitionPerMachine = 1;
+
+    @BeforeClass
+    public static void setUpMiniCluster() throws Exception {
+        cleanupStores();
+        startHDFS();
+        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+    }
+
+    protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf) throws IOException {
+    	copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText, conf, true);
+    }
+    
+    public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf, boolean ignoreZeroOutputs) throws IOException {
+        copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText,
+                conf, ignoreZeroOutputs, dfs);
+    }
+    
+    /*
+     * Merge and copy a DFS directory to a local destination, converting to text if necessary. 
+     * Also locally store the binary-formatted result if available.
+     */
+    public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+            Configuration conf, boolean ignoreZeroOutputs, FileSystem dfs) throws IOException {
+        if (resultsAreText) {
+            // for text files, just concatenate them together
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+                    new Path(localDestFile), false, conf, null);
+        } else {
+            // file is binary
+            // save the entire binary output dir
+            FileUtil.copy(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
+                    new Path(localDestFile + ".bindir"), false, conf);
+            
+            // chomp through output files
+            FileStatus[] files = ArrayUtils.addAll(dfs.globStatus(new Path(hdfsSrcDir + "*")), dfs.globStatus(new Path(hdfsSrcDir + "*/*")));
+            FileStatus validFile = null;
+            for (FileStatus f : files) {
+            	if (f.getLen() != 0) {
+            		validFile = f;
+            		break;
+            	}
+            }
+            if (validFile == null) {
+            	if (ignoreZeroOutputs) {
+            		// just make a dummy output dir
+            		FileSystem lfs = FileSystem.getLocal(new Configuration());
+            		lfs.mkdirs(new Path(localDestFile).getParent());
+            		return;
+            	}
+            	else {
+            		throw new IOException("No non-zero outputs in source directory " + hdfsSrcDir);
+            	}
+            }
+
+            // also load the Nodes and write them out as text locally. 
+            FileSystem lfs = FileSystem.getLocal(new Configuration());
+            lfs.mkdirs(new Path(localDestFile).getParent());
+            File filePathTo = new File(localDestFile);
+            if (filePathTo.exists() && filePathTo.isDirectory()) {
+                filePathTo = new File(localDestFile + "/data");
+            }
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+            SequenceFile.Reader reader = new SequenceFile.Reader(dfs, validFile.getPath(), conf);
+            SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile
+                    + ".binmerge"), reader.getKeyClass(), reader.getValueClass());
+
+            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+            for (FileStatus f : files) {
+                if (f.getLen() == 0) {
+                    continue;
+                }
+                reader = new SequenceFile.Reader(dfs, f.getPath(), conf);
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                    writer.append(key, value);
+
+                }
+                reader.close();
+            }
+            writer.close();
+            bw.close();
+        }
+
+    }
+
+    protected static boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+        File dumped = new File(actualPath);
+        if (poslistField != null) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
+
+    protected static void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    protected static void startHDFS() throws IOException {
+//        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+        //        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+//        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        dfs = dfsCluster.getFileSystem();
+        mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+        System.out.println(dfs.getUri().toString());
+
+        DataOutputStream confOutput = new DataOutputStream(
+                new FileOutputStream(new File(HADOOP_CONF)));
+        conf.writeXml(confOutput);
+        confOutput.close();
+    }
+
+    protected static void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+        Path dest = new Path(hdfsDest);
+        dfs.mkdirs(dest);
+        System.out.println("copying from " + localSrc + " to " + dest);
+        for (File f : new File(localSrc).listFiles()) {  
+        	dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+        }
+    }
+
+    /*
+     * Remove the local "actual" folder and any hdfs folders in use by this test
+     */
+    public void cleanUpOutput() throws IOException {
+        // local cleanup
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(ACTUAL_ROOT))) {
+            lfs.delete(new Path(ACTUAL_ROOT), true);
+        }
+        // dfs cleanup
+        for (String path : HDFS_PATHS) {
+            if (dfs.exists(new Path(path))) {
+                dfs.delete(new Path(path), true);
+            }
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        cleanupHDFS();
+    }
+
+    protected static void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+        mrCluster.shutdown();
+    }
+    
+//    public void buildGraph() throws IOException {
+//        JobConf buildConf = new JobConf(conf);  // use a separate conf so we don't interfere with other jobs 
+//        FileInputFormat.setInputPaths(buildConf, SEQUENCE);
+//        FileOutputFormat.setOutputPath(buildConf, new Path(INPUT_GRAPH));
+//        
+//        GraphBuildingDriver tldriver = new GraphBuildingDriver();
+//        tldriver.run(SEQUENCE, INPUT_GRAPH, 2, kmerByteSize, READ_LENGTH, false, true, HADOOP_CONF_ROOT + "conf.xml");
+//        
+//        boolean resultsAreText = true;
+//        copyResultsToLocal(INPUT_GRAPH, ACTUAL_ROOT + INPUT_GRAPH, resultsAreText, buildConf);
+//    }
+//    
+//    private void prepareGraph() throws IOException {
+//        if (regenerateGraph) {
+//            copyLocalToDFS(LOCAL_SEQUENCE_FILE, SEQUENCE);
+//            buildGraph();
+//            copyLocalToDFS(ACTUAL_ROOT + INPUT_GRAPH + readsFile + ".binmerge", INPUT_GRAPH);
+//        } else {
+//            copyLocalToDFS(EXPECTED_ROOT + INPUT_GRAPH + readsFile + ".binmerge", INPUT_GRAPH);
+//        }
+//    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
new file mode 100644
index 0000000..0457de9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
+import edu.uci.ics.hyracks.data.std.api.IComparable;
+import edu.uci.ics.hyracks.data.std.api.IHashable;
+import edu.uci.ics.hyracks.data.std.api.INumeric;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+
+public final class KmerPointable extends AbstractPointable implements IHashable, IComparable, INumeric {
+    public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean isFixedLength() {
+            return false;
+        }
+
+        @Override
+        public int getFixedLength() {
+            return -1;
+        }
+    };
+
+    public static final IPointableFactory FACTORY = new IPointableFactory() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IPointable createPointable() {
+            return new KmerPointable();
+        }
+
+        @Override
+        public ITypeTraits getTypeTraits() {
+            return TYPE_TRAITS;
+        }
+    };
+
+    public static short getShortReverse(byte[] bytes, int offset, int length) {
+        if (length < 2) {
+            return (short) (bytes[offset] & 0xff);
+        }
+        return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset + length - 2] & 0xff));
+    }
+
+    public static int getIntReverse(byte[] bytes, int offset, int length) {
+        int shortValue = getShortReverse(bytes, offset, length) & 0xffff;
+
+        if (length < 3) {
+            return shortValue;
+        }
+        if (length == 3) {
+            return (((bytes[offset + 2] & 0xff) << 16) + ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));
+        }
+        return ((bytes[offset + length - 1] & 0xff) << 24) + ((bytes[offset + length - 2] & 0xff) << 16)
+                + ((bytes[offset + length - 3] & 0xff) << 8) + ((bytes[offset + length - 4] & 0xff) << 0);
+    }
+
+    public static long getLongReverse(byte[] bytes, int offset, int length) {
+        if (length < 8) {
+            return ((long) getIntReverse(bytes, offset, length)) & 0x0ffffffffL;
+        }
+        return (((long) (bytes[offset + length - 1] & 0xff)) << 56)
+                + (((long) (bytes[offset + length - 2] & 0xff)) << 48)
+                + (((long) (bytes[offset + length - 3] & 0xff)) << 40)
+                + (((long) (bytes[offset + length - 4] & 0xff)) << 32)
+                + (((long) (bytes[offset + length - 5] & 0xff)) << 24)
+                + (((long) (bytes[offset + length - 6] & 0xff)) << 16)
+                + (((long) (bytes[offset + length - 7] & 0xff)) << 8) + (((long) (bytes[offset + length - 8] & 0xff)));
+    }
+
+    @Override
+    public int compareTo(IPointable pointer) {
+        return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
+    }
+
+    @Override
+    public int compareTo(byte[] bytes, int offset, int length) {
+
+        if (this.length != length) {
+            return this.length - length;
+        }
+        for (int i = length - 1; i >= 0; i--) {
+            int cmp = (this.bytes[this.start + i] & 0xff) - (bytes[offset + i] & 0xff);
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+
+        return 0;
+    }
+
+    @Override
+    public int hash() {
+        int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);
+        return hash;
+    }
+
+    @Override
+    public byte byteValue() {
+        return bytes[start + length - 1];
+    }
+
+    @Override
+    public short shortValue() {
+        return getShortReverse(bytes, start, length);
+    }
+
+    @Override
+    public int intValue() {
+        return getIntReverse(bytes, start, length);
+    }
+
+    @Override
+    public long longValue() {
+        return getLongReverse(bytes, start, length);
+    }
+
+    @Override
+    public float floatValue() {
+        return Float.intBitsToFloat(intValue());
+    }
+
+    @Override
+    public double doubleValue() {
+        return Double.longBitsToDouble(longValue());
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
new file mode 100644
index 0000000..60c0682
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.velvet.oldtype.NodeWritable;
+
+public class NodeReference extends NodeWritable {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+    public NodeReference(int kmerSize) {
+        super(kmerSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
new file mode 100644
index 0000000..47a3047
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionListReference extends PositionListWritable implements IValueReference {
+
+    public PositionListReference(int countByDataLength, byte[] byteArray, int startOffset) {
+        super(countByDataLength, byteArray, startOffset);
+    }
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
new file mode 100644
index 0000000..f066dc7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionReference extends PositionWritable implements IValueReference {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
new file mode 100644
index 0000000..de56b83
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+@SuppressWarnings("deprecation")
+public class GenomixJobConf extends JobConf {
+
+    public static final String JOB_NAME = "genomix";
+
+    /** Kmers length */
+    public static final String KMER_LENGTH = "genomix.kmerlen";
+    /** Read length */
+    public static final String READ_LENGTH = "genomix.readlen";
+    /** Frame Size */
+    public static final String FRAME_SIZE = "genomix.framesize";
+    /** Frame Limit, hyracks need */
+    public static final String FRAME_LIMIT = "genomix.framelimit";
+    /** Table Size, hyracks need */
+    public static final String TABLE_SIZE = "genomix.tablesize";
+    /** Groupby types */
+    public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+    /** Graph outputformat */
+    public static final String OUTPUT_FORMAT = "genomix.graph.output";
+    /** Get reversed Kmer Sequence */
+    public static final String REVERSED_KMER = "genomix.kmer.reversed";
+
+    /** Configurations used by hybrid groupby function in graph build phrase */
+    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+    public static final int DEFAULT_KMERLEN = 21;
+    public static final int DEFAULT_READLEN = 124;
+    public static final int DEFAULT_FRAME_SIZE = 128 * 1024;
+    public static final int DEFAULT_FRAME_LIMIT = 4096;
+    public static final int DEFAULT_TABLE_SIZE = 10485767;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+
+    public static final boolean DEFAULT_REVERSED = true;
+
+    public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
+    public static final String JOB_PLAN_GRAPHSTAT = "graphstat";
+
+    public static final String GROUPBY_TYPE_HYBRID = "hybrid";
+    public static final String GROUPBY_TYPE_EXTERNAL = "external";
+    public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
+    public static final String OUTPUT_FORMAT_BINARY = "binary";
+    public static final String OUTPUT_FORMAT_TEXT = "text";
+
+    public GenomixJobConf() throws IOException {
+        super(new Configuration());
+    }
+
+    public GenomixJobConf(Configuration conf) throws IOException {
+        super(conf);
+    }
+
+    /**
+     * Set the kmer length
+     * 
+     * @param the
+     *            desired frame kmerByteSize
+     */
+    final public void setKmerLength(int kmerlength) {
+        setInt(KMER_LENGTH, kmerlength);
+    }
+
+    final public void setFrameSize(int frameSize) {
+        setInt(FRAME_SIZE, frameSize);
+    }
+
+    final public void setFrameLimit(int frameLimit) {
+        setInt(FRAME_LIMIT, frameLimit);
+    }
+
+    final public void setTableSize(int tableSize) {
+        setInt(TABLE_SIZE, tableSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
new file mode 100644
index 0000000..c8cb701
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public abstract class JobGen implements Serializable {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    protected final ConfFactory confFactory;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+    public JobGen(GenomixJobConf job) throws HyracksDataException {
+        this.confFactory = new ConfFactory(job);
+    }
+
+    public abstract JobSpecification generateJob() throws HyracksException;
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..7571653
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+@SuppressWarnings("deprecation")
+public class JobGenBrujinGraph extends JobGen {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public enum GroupbyType {
+        EXTERNAL,
+        PRECLUSTER,
+        HYBRIDHASH,
+    }
+
+    public enum OutputFormat {
+        TEXT,
+        BINARY,
+    }
+
+    protected ConfFactory hadoopJobConfFactory;
+    protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    protected String[] ncNodeNames;
+    protected String[] readSchedule;
+
+    protected int readLength;
+    protected int kmerSize;
+    protected int frameLimits;
+    protected int frameSize;
+    protected int tableSize;
+    protected GroupbyType groupbyType;
+    protected OutputFormat outputFormat;
+    protected boolean bGenerateReversedKmer;
+
+    protected void logDebug(String status) {
+        LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+    }
+
+    public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job);
+        String[] nodes = new String[ncMap.size()];
+        ncMap.keySet().toArray(nodes);
+        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+        for (int i = 0; i < numPartitionPerMachine; i++) {
+            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+        }
+        initJobConfiguration(scheduler);
+    }
+
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor outRed) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+                aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+                        tableSize), true);
+    }
+
+    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+            ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+            IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+            throws HyracksDataException {
+
+        Object[] obj = new Object[3];
+
+        switch (groupbyType) {
+            case EXTERNAL:
+                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+                        combineRed);
+                obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+                obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+                        finalRec);
+                break;
+            case PRECLUSTER:
+            default:
+
+                obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
+                        combineRed);
+                obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+                obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+                        finalRec);
+                jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+                break;
+        }
+        return obj;
+    }
+
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+            return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+                            kmerSize, bGenerateReversedKmer));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+        jobSpec.connect(conn, preOp, 0, nextOp, 0);
+    }
+
+    public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                ReadsKeyValueParserFactory.readKmerOutputRec);
+        connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
+
+        RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        jobSpec.setFrameSize(frameSize);
+
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
+                new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+        AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        logDebug("LocalKmerGroupby Operator");
+        connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("CrossKmerGroupby Operator");
+        IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+        return kmerCrossAggregator;
+    }
+
+    public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
+            AbstractOperatorDescriptor kmerCrossAggregator) {
+        // Map (Kmer, {(ReadID,PosInRead),...}) into
+        // (ReadID,PosInRead,{OtherPosition,...},Kmer)
+
+        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
+                MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return mapKmerToRead;
+    }
+
+    public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
+        // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                MapKmerPositionToReadOperator.readIDOutputRec);
+        connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
+
+        RecordDescriptor readIDFinalRec = new RecordDescriptor(
+                new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
+                new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
+                IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
+        AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+        connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        logDebug("Group by ReadID merger");
+        IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+        AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+        connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+        return readCrossAggregator;
+    }
+
+    public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readCrossAggregator) {
+        // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
+        // OutgoingList, Kmer)
+
+        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+                MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return mapEachReadToNode;
+    }
+
+    public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
+            AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
+        // Output Kmer
+        ITupleWriterFactory kmerWriter = null;
+        switch (outputFormat) {
+            case TEXT:
+                kmerWriter = new KMerTextWriterFactory(kmerSize);
+                break;
+            case BINARY:
+            default:
+                kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
+                break;
+        }
+        logDebug("WriteOperator");
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), kmerWriter);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return writeKmerOperator;
+    }
+
+    public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+        ITupleWriterFactory nodeWriter = null;
+        switch (outputFormat) {
+            case TEXT:
+                nodeWriter = new NodeTextWriterFactory(kmerSize);
+                break;
+            case BINARY:
+            default:
+                nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
+                break;
+        }
+        logDebug("WriteOperator");
+        // Output Node
+        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), nodeWriter);
+        connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return writeNodeOperator;
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        // logDebug("Write kmer to result");
+        // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+        logDebug("Map Kmer to Read Operator");
+        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+        logDebug("Group by Read Operator");
+        lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+        logDebug("Generate final node");
+        lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+        logDebug("Write node to result");
+        lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
+
+        jobSpec.addRoot(lastOperator);
+        return jobSpec;
+    }
+
+    protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
+        Configuration conf = confFactory.getConf();
+        readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
+        kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
+        if (kmerSize % 2 == 0) {
+            kmerSize--;
+            conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
+        }
+        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
+
+        bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
+
+        String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
+            groupbyType = GroupbyType.EXTERNAL;
+        } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
+            groupbyType = GroupbyType.PRECLUSTER;
+        } else {
+            groupbyType = GroupbyType.HYBRIDHASH;
+        }
+
+        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        if (output.equalsIgnoreCase("text")) {
+            outputFormat = OutputFormat.TEXT;
+        } else {
+            outputFormat = OutputFormat.BINARY;
+        }
+        try {
+            hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+            readSchedule = scheduler.getLocationConstraints(splits);
+        } catch (IOException ex) {
+            throw new HyracksDataException(ex);
+        }
+
+        LOG.info("Genomix Graph Build Configuration");
+        LOG.info("Kmer:" + kmerSize);
+        LOG.info("Groupby type:" + type);
+        LOG.info("Output format:" + output);
+        LOG.info("Frame limit" + frameLimits);
+        LOG.info("Frame kmerByteSize" + frameSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
new file mode 100644
index 0000000..b4b1e73
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.velvet.oldtype.PositionWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCheckReader extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Write kmer to result");
+        generateRootByWriteKmerReader(jobSpec, readOperator);
+
+        return jobSpec;
+    }
+
+    public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
+            HDFSReadOperatorDescriptor readOperator) throws HyracksException {
+        // Output Kmer
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+                    /**
+             * 
+             */
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        return new ITupleWriter() {
+
+                            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+                            private PositionWritable pos = new PositionWritable();
+
+                            @Override
+                            public void open(DataOutput output) throws HyracksDataException {
+                            }
+
+                            @Override
+                            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                                try {
+                                    if (kmer.getLength() > tuple
+                                            .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+                                        throw new IllegalArgumentException("Not enough kmer bytes");
+                                    }
+                                    kmer.setNewReference(
+                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
+                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
+                                    pos.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputPosition),
+                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputPosition));
+
+                                    output.write(kmer.toString().getBytes());
+                                    output.writeByte('\t');
+                                    output.write(pos.toString().getBytes());
+                                    output.writeByte('\n');
+                                } catch (IOException e) {
+                                    throw new HyracksDataException(e);
+                                }
+                            }
+
+                            @Override
+                            public void close(DataOutput output) throws HyracksDataException {
+
+                            }
+
+                        };
+                    }
+
+                });
+        connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
new file mode 100644
index 0000000..5202ba2
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCreateKmerInfo extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenCreateKmerInfo(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        logDebug("Write kmer to result");
+        lastOperator = generateKmerWritorOperator(jobSpec, lastOperator);
+        jobSpec.addRoot(lastOperator);
+
+        return jobSpec;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
new file mode 100644
index 0000000..1e78b79
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenGroupbyReadID extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenGroupbyReadID(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        //logDebug("Write kmer to result");
+        //generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+        logDebug("Map Kmer to Read Operator");
+        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+        logDebug("Group by Read Operator");
+        lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+        logDebug("Write node to result");
+        lastOperator = generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
+        jobSpec.addRoot(lastOperator);
+        return jobSpec;
+    }
+
+    public AbstractOperatorDescriptor generateRootByWriteReadIDAggregationResult(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readCrossAggregator) throws HyracksException {
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+                    /**
+                     * 
+                     */
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        return new ITupleWriter() {
+
+                            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+                            private PositionListWritable plist = new PositionListWritable();
+
+                            @Override
+                            public void open(DataOutput output) throws HyracksDataException {
+
+                            }
+
+                            @Override
+                            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                                int readId = Marshal.getInt(tuple.getFieldData(0), tuple.getFieldStart(0));
+                                try {
+                                    output.write((Integer.toString(readId) + "\t").getBytes());
+                                    for (int i = 1; i < tuple.getFieldCount(); i++) {
+                                        int fieldOffset = tuple.getFieldStart(i);
+                                        while (fieldOffset < tuple.getFieldStart(i) + tuple.getFieldLength(i)) {
+                                            byte[] buffer = tuple.getFieldData(i);
+                                            // read poslist
+                                            int posCount = PositionListWritable.getCountByDataLength(Marshal.getInt(
+                                                    buffer, fieldOffset));
+                                            fieldOffset += 4;
+                                            plist.setNewReference(posCount, buffer, fieldOffset);
+                                            fieldOffset += plist.getLength();
+
+                                            int posInRead = (i + 1) / 2;
+                                            if (i % 2 == 0) {
+                                                posInRead = -posInRead;
+                                            }
+                                            String kmerString = "";
+                                            if (posInRead > 0) {
+                                                int kmerbytes = Marshal.getInt(buffer, fieldOffset);
+                                                if (kmer.getLength() != kmerbytes) {
+                                                    throw new IllegalArgumentException("kmerlength is invalid");
+                                                }
+                                                fieldOffset += 4;
+                                                kmer.setNewReference(buffer, fieldOffset);
+                                                fieldOffset += kmer.getLength();
+                                                kmerString = kmer.toString();
+                                            }
+
+                                            output.write(Integer.toString(posInRead).getBytes());
+                                            output.writeByte('\t');
+                                            output.write(plist.toString().getBytes());
+                                            output.writeByte('\t');
+                                            output.write(kmerString.getBytes());
+                                            output.writeByte('\t');
+                                        }
+                                    }
+                                    output.writeByte('\n');
+                                } catch (IOException e) {
+                                    throw new HyracksDataException(e);
+                                }
+                            }
+
+                            @Override
+                            public void close(DataOutput output) throws HyracksDataException {
+
+                            }
+
+                        };
+                    }
+
+                });
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
+        return writeKmerOperator;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
new file mode 100644
index 0000000..8e727959
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.velvet.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.velvet.oldtype.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenMapKmerToRead extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenMapKmerToRead(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapper) throws HyracksException {
+        // Output Kmer
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+                    /**
+                     * 
+                     */
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        return new ITupleWriter() {
+
+                            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+                            private PositionListWritable plist = new PositionListWritable();
+
+                            @Override
+                            public void open(DataOutput output) throws HyracksDataException {
+
+                            }
+
+                            @Override
+                            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                                try {
+                                    int readID = Marshal.getInt(
+                                            tuple.getFieldData(MapKmerPositionToReadOperator.OutputReadIDField),
+                                            tuple.getFieldStart(MapKmerPositionToReadOperator.OutputReadIDField));
+                                    byte posInRead = tuple
+                                            .getFieldData(MapKmerPositionToReadOperator.OutputPosInReadField)[tuple
+                                            .getFieldStart(MapKmerPositionToReadOperator.OutputPosInReadField)];
+                                    int posCount = PositionListWritable.getCountByDataLength(tuple
+                                            .getFieldLength(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+                                    plist.setNewReference(
+                                            posCount,
+                                            tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
+                                            tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+
+                                    String kmerString = "";
+                                    if (posInRead > 0) {
+                                        if (kmer.getLength() > tuple
+                                                .getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
+                                            throw new IllegalArgumentException("Not enough kmer bytes");
+                                        }
+                                        kmer.setNewReference(
+                                                tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
+                                                tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
+                                        kmerString = kmer.toString();
+                                    }
+
+                                    output.write(Integer.toString(readID).getBytes());
+                                    output.writeByte('\t');
+                                    output.write(Integer.toString(posInRead).getBytes());
+                                    output.writeByte('\t');
+                                    output.write(plist.toString().getBytes());
+                                    output.writeByte('\t');
+                                    output.write(kmerString.getBytes());
+                                    output.writeByte('\n');
+                                } catch (IOException e) {
+                                    throw new HyracksDataException(e);
+                                }
+
+                            }
+
+                            @Override
+                            public void close(DataOutput output) throws HyracksDataException {
+
+                            }
+
+                        };
+                    }
+
+                });
+        connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        logDebug("Map Kmer to Read Operator");
+        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+        generateRootByWriteMapperFromKmerToReadID(jobSpec, lastOperator);
+
+        return jobSpec;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
new file mode 100644
index 0000000..21b6385
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenUnMerged extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenUnMerged(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    @Override
+    public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readCrossAggregator) {
+        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+                MapReadToNodeOperator.nodeOutputRec, kmerSize, false);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return mapEachReadToNode;
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
index 2dad500..8dec857 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
@@ -6,7 +6,7 @@
 
 import org.apache.hadoop.io.WritableComparable;
 
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
 
 public class IncomingListWritable implements WritableComparable<IncomingListWritable>{
     private PositionListWritable reverseForwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index 6b32b51..c42bf32 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -6,7 +6,7 @@
 
 import org.apache.hadoop.io.WritableComparable;
 
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.genomix.pregelix.type.CheckMessage;
 import edu.uci.ics.genomix.pregelix.type.Message;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -59,7 +59,8 @@
         checkMessage = 0;
         if (sourceVertexId != null) {
             checkMessage |= CheckMessage.SOURCE;
-            this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+            
+            this.sourceVertexId.set((byte)0, sourceVertexId.getReadId(),sourceVertexId.getPosId());
         }
         if (chainVertexId != null) {
             checkMessage |= CheckMessage.ACUTUALKMER;
@@ -91,7 +92,7 @@
     public void setSourceVertexId(PositionWritable sourceVertexId) {
         if (sourceVertexId != null) {
             checkMessage |= CheckMessage.SOURCE;
-            this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+            this.sourceVertexId.set((byte)0, sourceVertexId.getReadId(),sourceVertexId.getPosId());
         }
     }
     
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
index af14f00..275954d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
@@ -6,7 +6,7 @@
 
 import org.apache.hadoop.io.WritableComparable;
 
-import edu.uci.ics.genomix.oldtype.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
 
 public class OutgoingListWritable implements WritableComparable<OutgoingListWritable>{
     private PositionListWritable forwardForwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 039f5b4..50b1518 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -4,7 +4,7 @@
 import java.util.Iterator;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.genomix.pregelix.client.Client;
 import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
 import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;