finish mapKmerToReadID
diff --git a/genomix/genomix-data/.classpath b/genomix/genomix-data/.classpath
index 6e26406..e43402f 100644
--- a/genomix/genomix-data/.classpath
+++ b/genomix/genomix-data/.classpath
@@ -1,10 +1,36 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
-	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"/>
-	<classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
-	<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
-	<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="src" output="target/classes" path="src/main/java">
+		<attributes>
+			<attribute name="optional" value="true"/>
+			<attribute name="maven.pomderived" value="true"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
+		<attributes>
+			<attribute name="maven.pomderived" value="true"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="src" output="target/test-classes" path="src/test/java">
+		<attributes>
+			<attribute name="optional" value="true"/>
+			<attribute name="maven.pomderived" value="true"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
+		<attributes>
+			<attribute name="maven.pomderived" value="true"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
+		<attributes>
+			<attribute name="maven.pomderived" value="true"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
+		<attributes>
+			<attribute name="maven.pomderived" value="true"/>
+		</attributes>
+	</classpathentry>
 	<classpathentry kind="output" path="target/classes"/>
 </classpath>
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java
new file mode 100644
index 0000000..0dc24a5
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class Position implements Writable {
+    public int readID;
+    public byte posInRead;
+
+    public Position() {
+        readID = 0;
+        posInRead = 0;
+    }
+
+    public Position(int readID, byte posInRead) {
+        this.readID = readID;
+        this.posInRead = posInRead;
+    }
+
+    public Position(final Position pos) {
+        this.readID = pos.readID;
+        this.posInRead = pos.posInRead;
+    }
+
+    public void set(int readID, byte posInRead) {
+        this.readID = readID;
+        this.posInRead = posInRead;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        readID = in.readInt();
+        posInRead = in.readByte();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(readID);
+        out.writeByte(posInRead);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index ca5cb61..ebc49f2 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -261,5 +261,6 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		
 	</dependencies>
 </project>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
new file mode 100644
index 0000000..3826f9b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
+
+    private ByteSerializerDeserializer() {
+    }
+
+    @Override
+    public Byte deserialize(DataInput in) throws HyracksDataException {
+        try {
+            return in.readByte();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
+        try {
+            out.writeByte(instance.intValue());
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static byte getByte(byte[] bytes, int offset) {
+        return bytes[offset];
+    }
+
+    public static void putByte(byte val, byte[] bytes, int offset) {
+        bytes[offset] = val;
+    }
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
new file mode 100644
index 0000000..a4cfd3b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
@@ -0,0 +1,44 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.genomix.hyracks.data.accessors;

+

+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+

+public class KmerBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

+    private static final long serialVersionUID = 1L;

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction(final int seed) {

+

+        return new IBinaryHashFunction() {

+            private KmerPointable p = new KmerPointable();

+

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                if (length + offset >= bytes.length)

+                    throw new IllegalStateException("out of bound");

+                p.set(bytes, offset, length);

+                int hash = p.hash() * (seed + 1);

+                if (hash < 0) {

+                    hash = -(hash + 1);

+                }

+                return hash;

+            }

+        };

+    }

+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
new file mode 100644
index 0000000..eb0d6bb
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
@@ -0,0 +1,56 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.genomix.hyracks.data.accessors;

+

+import java.nio.ByteBuffer;

+

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

+

+public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    public static int hashBytes(byte[] bytes, int offset, int length) {

+        int hash = 1;

+        for (int i = offset; i < offset + length; i++)

+            hash = (31 * hash) + (int) bytes[i];

+        return hash;

+    }

+

+    @Override

+    public ITuplePartitionComputer createPartitioner() {

+        return new ITuplePartitionComputer() {

+            @Override

+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {

+                int startOffset = accessor.getTupleStartOffset(tIndex);

+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);

+                int slotLength = accessor.getFieldSlotsLength();

+                int fieldLength = accessor.getFieldLength(tIndex, 0);

+

+                ByteBuffer buf = accessor.getBuffer();

+

+                int hash = hashBytes(buf.array(), startOffset + fieldOffset + slotLength, fieldLength);

+                if (hash < 0) {

+                    hash = -(hash + 1);

+                }

+

+                return hash % nParts;

+            }

+        };

+    }

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
new file mode 100644
index 0000000..44b0e10
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
@@ -0,0 +1,37 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.genomix.hyracks.data.accessors;

+

+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

+

+public class KmerNormarlizedComputerFactory implements INormalizedKeyComputerFactory {

+    private static final long serialVersionUID = 1L;

+

+    @Override

+    public INormalizedKeyComputer createNormalizedKeyComputer() {

+        return new INormalizedKeyComputer() {

+            /**

+             * read one int from Kmer, make sure this int is consistent whith Kmer compartor

+             */

+            @Override

+            public int normalize(byte[] bytes, int start, int length) {

+                return KmerPointable.getIntReverse(bytes, start, length);

+            }

+        };

+    }

+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
new file mode 100644
index 0000000..d328bd1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        // TODO Auto-generated method stub
+        return new ITuplePartitionComputer() {
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+                int startOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
+                int slotLength = accessor.getFieldSlotsLength();
+
+                ByteBuffer buf = accessor.getBuffer();
+
+                int hash = IntegerSerializerDeserializer.getInt(buf.array(), startOffset + fieldOffset + slotLength);
+                if (hash < 0) {
+                    hash = -(hash + 1);
+                }
+
+                return hash % nParts;
+            }
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
new file mode 100644
index 0000000..4ceda78
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
@@ -0,0 +1,145 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.genomix.hyracks.data.primitive;

+

+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;

+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;

+import edu.uci.ics.hyracks.data.std.api.IComparable;

+import edu.uci.ics.hyracks.data.std.api.IHashable;

+import edu.uci.ics.hyracks.data.std.api.INumeric;

+import edu.uci.ics.hyracks.data.std.api.IPointable;

+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;

+

+public final class KmerPointable extends AbstractPointable implements IHashable, IComparable, INumeric {

+    public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {

+        private static final long serialVersionUID = 1L;

+

+        @Override

+        public boolean isFixedLength() {

+            return false;

+        }

+

+        @Override

+        public int getFixedLength() {

+            return -1;

+        }

+    };

+

+    public static final IPointableFactory FACTORY = new IPointableFactory() {

+        private static final long serialVersionUID = 1L;

+

+        @Override

+        public IPointable createPointable() {

+            return new KmerPointable();

+        }

+

+        @Override

+        public ITypeTraits getTypeTraits() {

+            return TYPE_TRAITS;

+        }

+    };

+

+    public static short getShortReverse(byte[] bytes, int offset, int length) {

+        if (length < 2) {

+            return (short) (bytes[offset] & 0xff);

+        }

+        return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset + length - 2] & 0xff));

+    }

+

+    public static int getIntReverse(byte[] bytes, int offset, int length) {

+        int shortValue = getShortReverse(bytes, offset, length) & 0xffff;

+

+        if (length < 3) {

+            return shortValue;

+        }

+        if (length == 3) {

+            return (((bytes[offset + 2] & 0xff) << 16) + ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));

+        }

+        return ((bytes[offset + length - 1] & 0xff) << 24) + ((bytes[offset + length - 2] & 0xff) << 16)

+                + ((bytes[offset + length - 3] & 0xff) << 8) + ((bytes[offset + length - 4] & 0xff) << 0);

+    }

+

+    public static long getLongReverse(byte[] bytes, int offset, int length) {

+        if (length < 8) {

+            return ((long) getIntReverse(bytes, offset, length)) & 0x0ffffffffL;

+        }

+        return (((long) (bytes[offset + length - 1] & 0xff)) << 56)

+                + (((long) (bytes[offset + length - 2] & 0xff)) << 48)

+                + (((long) (bytes[offset + length - 3] & 0xff)) << 40)

+                + (((long) (bytes[offset + length - 4] & 0xff)) << 32)

+                + (((long) (bytes[offset + length - 5] & 0xff)) << 24)

+                + (((long) (bytes[offset + length - 6] & 0xff)) << 16)

+                + (((long) (bytes[offset + length - 7] & 0xff)) << 8) + (((long) (bytes[offset + length - 8] & 0xff)));

+    }

+

+    @Override

+    public int compareTo(IPointable pointer) {

+        return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());

+    }

+

+    @Override

+    public int compareTo(byte[] bytes, int offset, int length) {

+

+        if (this.length != length) {

+            return this.length - length;

+        }

+        for (int i = length - 1; i >= 0; i--) {

+            int cmp = (this.bytes[this.start + i] & 0xff) - (bytes[offset + i] & 0xff);

+            if (cmp != 0) {

+                return cmp;

+            }

+        }

+

+        return 0;

+    }

+

+    @Override

+    public int hash() {

+        int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);

+        return hash;

+    }

+

+    @Override

+    public byte byteValue() {

+        return bytes[start + length - 1];

+    }

+

+    @Override

+    public short shortValue() {

+        return getShortReverse(bytes, start, length);

+    }

+

+    @Override

+    public int intValue() {

+        return getIntReverse(bytes, start, length);

+    }

+

+    @Override

+    public long longValue() {

+        return getLongReverse(bytes, start, length);

+    }

+

+    @Override

+    public float floatValue() {

+        return Float.intBitsToFloat(intValue());

+    }

+

+    @Override

+    public double doubleValue() {

+        return Double.longBitsToDouble(longValue());

+    }

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
new file mode 100644
index 0000000..0fa1489
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -0,0 +1,114 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class PositionListReference implements Writable, IValueReference {
+    private byte[] values;
+    private int valueCount;
+    private static final byte[] EMPTY = {};
+
+    private PositionReference posIter = new PositionReference();
+
+    public PositionListReference() {
+        this.values = EMPTY;
+        this.valueCount = 0;
+    }
+
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+    }
+
+    protected int getCapacity() {
+        return values.length;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap > getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (values.length > 0) {
+                System.arraycopy(values, 0, new_data, 0, values.length);
+            }
+            values = new_data;
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.valueCount = in.readInt();
+        setSize(valueCount * PositionReference.LENGTH);
+        in.readFully(values, 0, valueCount * PositionReference.LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(valueCount);
+        out.write(values, 0, valueCount * PositionReference.LENGTH);
+    }
+
+    public PositionReference getPosition(int i) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("Not so much positions");
+        }
+        posIter.setNewSpace(values, i * PositionReference.LENGTH);
+        return posIter;
+    }
+
+    public void set(PositionListReference list2) {
+        set(list2.valueCount, list2.values, 0);
+    }
+
+    public void set(int valueCount, byte[] newData, int offset) {
+        this.valueCount = valueCount;
+        setSize(valueCount * PositionReference.LENGTH);
+        if (valueCount > 0) {
+            System.arraycopy(newData, offset, values, 0, valueCount * PositionReference.LENGTH);
+        }
+    }
+
+    public void reset() {
+        valueCount = 0;
+    }
+
+    public void append(PositionReference pos) {
+        setSize((1 + valueCount) * PositionReference.LENGTH);
+        System.arraycopy(pos.getByteArray(), pos.getStartOffset(), values, valueCount * PositionReference.LENGTH,
+                pos.getLength());
+        valueCount += 1;
+    }
+
+    public void append(int readID, byte posInRead) {
+        setSize((1 + valueCount) * PositionReference.LENGTH);
+        IntegerSerializerDeserializer.putInt(readID, values, valueCount * PositionReference.LENGTH);
+        values[valueCount * PositionReference.LENGTH + PositionReference.INTBYTES] = posInRead;
+        valueCount += 1;
+    }
+
+    public int getPositionCount() {
+        return valueCount;
+    }
+
+    @Override
+    public byte[] getByteArray() {
+        return values;
+    }
+
+    @Override
+    public int getStartOffset() {
+        return 0;
+    }
+
+    @Override
+    public int getLength() {
+        return valueCount * PositionReference.LENGTH;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
new file mode 100644
index 0000000..b3feaa5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class PositionReference implements IValueReference,Writable {
+    private byte[] storage;
+    private int offset;
+    public static final int LENGTH = 5;
+    public static final int INTBYTES = 4;
+
+    public PositionReference() {
+        storage = new byte[LENGTH];
+        offset = 0;
+    }
+
+    public PositionReference(byte[] storage, int offset) {
+        setNewSpace(storage, offset);
+    }
+
+    public PositionReference(int readID, byte posInRead) {
+        this();
+        IntegerSerializerDeserializer.putInt(readID, storage, offset);
+        storage[offset + INTBYTES] = posInRead;
+    }
+
+    public void set(int readID, byte posInRead) {
+        IntegerSerializerDeserializer.putInt(readID, storage, offset);
+        storage[offset + INTBYTES] = posInRead;
+    }
+
+    public void setNewSpace(byte[] storage, int offset) {
+        this.storage = storage;
+        this.offset = offset;
+    }
+
+    public int getReadID() {
+        return IntegerSerializerDeserializer.getInt(storage, offset);
+    }
+
+    public byte getPosInRead() {
+        return storage[offset + INTBYTES];
+    }
+
+    @Override
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    @Override
+    public int getStartOffset() {
+        return offset;
+    }
+
+    @Override
+    public int getLength() {
+        return LENGTH;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        in.readFully(storage, offset, LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.write(storage, offset, LENGTH);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..fe44f51
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,42 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.genomix.hyracks.dataflow;

+

+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;

+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

+

+/**

+ * used by precluster groupby

+ */

+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {

+    private static final long serialVersionUID = 1L;

+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

+

+    @Override

+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,

+            int[] fanouts) {

+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

+            return senderSideMaterializePolicy;

+        } else {

+            return pipeliningPolicy;

+        }

+    }

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
new file mode 100644
index 0000000..1bd427c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+@SuppressWarnings("deprecation")
+public class KMerSequenceWriterFactory implements ITupleWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+    private ConfFactory confFactory;
+    private final int kmerlength;
+
+    public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+        this.confFactory = new ConfFactory(conf);
+        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+    }
+
+    public class TupleWriter implements ITupleWriter {
+        public TupleWriter(ConfFactory cf) {
+            this.cf = cf;
+        }
+
+        ConfFactory cf;
+        Writer writer = null;
+
+        KmerCountValue reEnterCount = new KmerCountValue();
+        KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+
+        /**
+         * assumption is that output never change source!
+         */
+        @Override
+        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+            try {
+                byte[] kmer = tuple.getFieldData(0);
+                int keyStart = tuple.getFieldStart(0);
+                int keyLength = tuple.getFieldLength(0);
+
+                byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+                byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+                reEnterCount.set(bitmap, count);
+                reEnterKey.set(kmer, keyStart, keyLength);
+                writer.append(reEnterKey, reEnterCount);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void open(DataOutput output) throws HyracksDataException {
+            try {
+                writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
+                        KmerCountValue.class, CompressionType.NONE, null);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void close(DataOutput output) throws HyracksDataException {
+            // TODO Auto-generated method stub
+        }
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new TupleWriter(confFactory);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
new file mode 100644
index 0000000..54e84f7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class KMerTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+    private KmerBytesWritable kmer;
+
+    public KMerTextWriterFactory(int k) {
+        kmer = new KmerBytesWritable(k);
+    }
+
+    public class TupleWriter implements ITupleWriter {
+        @Override
+        public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+            try {
+                kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0), tuple.getFieldLength(0));
+                output.write(kmer.toString().getBytes());
+                output.writeByte('\t');
+                output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
+                output.writeByte('\t');
+                output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
+                output.writeByte('\n');
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void open(DataOutput output) throws HyracksDataException {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public void close(DataOutput output) throws HyracksDataException {
+            // TODO Auto-generated method stub
+        }
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new TupleWriter();
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
new file mode 100644
index 0000000..924f455
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -0,0 +1,176 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
+
+    public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = recDesc;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,*Kmer*,{OtherReadID,...})
+     * OtherReadID appears only when otherReadID.otherPos==0
+     */
+    public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        private final IHyracksTaskContext ctx;
+        private final RecordDescriptor inputRecDesc;
+        private final RecordDescriptor outputRecDesc;
+
+        private FrameTupleAccessor accessor;
+        private ByteBuffer writeBuffer;
+        private ArrayTupleBuilder builder;
+        private FrameTupleAppender appender;
+
+        public static final int InputKmerField = 0;
+        public static final int InputPosListField = 1;
+
+        public static final int OutputReadIDField = 0;
+        public static final int OutputPosInReadField = 1;
+        public static final int OutputOtherReadIDListField = 2;
+        public static final int OutputKmerField = 3; // may not needed
+
+        private PositionReference positionEntry;
+        private ArrayBackedValueStorage posListEntry;
+        private ArrayBackedValueStorage zeroPositionCollection;
+        private ArrayBackedValueStorage noneZeroPositionCollection;
+
+        public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+                RecordDescriptor outputRecDesc) {
+            this.ctx = ctx;
+            this.inputRecDesc = inputRecDesc;
+            this.outputRecDesc = outputRecDesc;
+            this.positionEntry = new PositionReference();
+            this.posListEntry = new ArrayBackedValueStorage();
+            this.zeroPositionCollection = new ArrayBackedValueStorage();
+            this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+            writeBuffer = ctx.allocateFrame();
+            builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+            appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(writeBuffer, true);
+            writer.open();
+            posListEntry.reset();
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            accessor.reset(buffer);
+            int tupleCount = accessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
+                writeTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+            }
+        }
+
+        private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
+                ArrayBackedValueStorage noneZeroPositionCollection2) {
+            zeroPositionCollection2.reset();
+            noneZeroPositionCollection2.reset();
+            byte[] data = accessor.getBuffer().array();
+            //Kmer, {(ReadID,PosInRead),...}
+            //  to ReadID, PosInRead, Kmer, {OtherReadID}
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
+            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+                positionEntry.setNewSpace(data, offsetPoslist + i);
+                if (positionEntry.getPosInRead() == 0) {
+                    zeroPositionCollection2.append(positionEntry);
+                } else {
+                    noneZeroPositionCollection2.append(positionEntry);
+                }
+            }
+
+        }
+
+        private void writeTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+                ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
+            byte[] data = accessor.getBuffer().array();
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
+            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+                positionEntry.setNewSpace(data, offsetPoslist + i);
+                if (positionEntry.getPosInRead() != 0) {
+                    appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
+                } else {
+                    appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
+                }
+            }
+        }
+
+        private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
+                ArrayTupleBuilder builder2) {
+            try {
+                builder2.addField(pos.getByteArray(), 0, PositionReference.INTBYTES);
+                builder2.addField(pos.getByteArray(), PositionReference.INTBYTES, 1);
+                //? ask Yingyi, if support empty bytes[]
+                if (posList2 == null) {
+                    builder2.addFieldEndOffset();
+                } else {
+                    builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+                }
+                // set kmer, may not useful
+                byte[] data = accessor.getBuffer().array();
+                int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, InputKmerField);
+                builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+                
+                if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
+                    if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+                builder2.reset();
+            } catch (HyracksDataException e) {
+                throw new IllegalStateException(
+                        "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+            }
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+        }
+
+    }
+
+    @Override
+    public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
+                0), recordDescriptors[0]);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..1bc2137
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
+    private KmerBytesWritable kmer;
+    private boolean bReversed;
+
+    public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
+        bReversed = bGenerateReversed;
+        kmer = new KmerBytesWritable(k);
+    }
+
+    @Override
+    public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputBuffer, true);
+
+        return new IKeyValueParser<LongWritable, Text>() {
+
+            @Override
+            public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+                String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
+                if (geneLine.length != 2) {
+                    return;
+                }
+                int readID = 0;
+                try {
+                    readID = Integer.parseInt(geneLine[0]);
+                } catch (NumberFormatException e) {
+                    LOG.warn("Invalid data");
+                    return;
+                }
+
+                Pattern genePattern = Pattern.compile("[AGCT]+");
+                Matcher geneMatcher = genePattern.matcher(geneLine[1]);
+                boolean isValid = geneMatcher.matches();
+                if (isValid) {
+                    SplitReads(readID, geneLine[1].getBytes(), writer);
+                }
+            }
+
+            private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
+                /** first kmer */
+                int k = kmer.getKmerLength();
+                if (k >= array.length) {
+                    return;
+                }
+                kmer.setByRead(array, 0);
+                InsertToFrame(kmer, readID, 0, writer);
+
+                /** middle kmer */
+                for (int i = k; i < array.length; i++) {
+                    kmer.shiftKmerWithNextChar(array[i]);
+                    InsertToFrame(kmer, readID, i - k + 1, writer);
+                }
+
+                if (bReversed) {
+                    /** first kmer */
+                    kmer.setByReadReverse(array, 0);
+                    InsertToFrame(kmer, -readID, array.length - k, writer);
+                    /** middle kmer */
+                    for (int i = k; i < array.length; i++) {
+                        kmer.shiftKmerWithPreChar(array[i]);
+                        InsertToFrame(kmer, -readID, array.length - i - 1, writer);
+                    }
+                }
+            }
+
+            private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
+                try {
+                    if (posInRead > 127){
+                        throw new IllegalArgumentException ("Position id is beyond 127 at " + readID);
+                    }
+                    tupleBuilder.reset();
+                    tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
+                    tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, readID);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, (byte)posInRead);
+
+                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputBuffer, writer);
+                        outputAppender.reset(outputBuffer, true);
+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                tupleBuilder.getSize())) {
+                            throw new IllegalStateException(
+                                    "Failed to copy an record into a frame: the record size is too large.");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+
+            @Override
+            public void open(IFrameWriter writer) throws HyracksDataException {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+        };
+    }
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
new file mode 100644
index 0000000..2ed4c9b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggerateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new IAggregatorDescriptor(){
+
+            @Override
+            public AggregateState createAggregateStates() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void reset() {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                // TODO Auto-generated method stub
+                
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+                
+            }
+            
+        };
+    }
+    
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..7dc4947
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        return new IAggregatorDescriptor() {
+            private PositionReference position = new PositionReference();
+
+            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+                return offset;
+            }
+
+            protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+                        getOffSet(accessor, tIndex, fieldId));
+            }
+
+            protected int readIntField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                return IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        getOffSet(accessor, tIndex, fieldId));
+            }
+
+            @Override
+            public void reset() {
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState(new ArrayBackedValueStorage());
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                inputVal.reset();
+                position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+                inputVal.append(position);
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+                inputVal.append(position);
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("partial result method should not be called");
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                try {
+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..ec2b47c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,105 @@
+/*

+ * Copyright 2009-2012 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;

+

+import java.io.DataOutput;

+import java.io.IOException;

+

+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

+

+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

+    private static final long serialVersionUID = 1L;

+

+    @Override

+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

+            throws HyracksDataException {

+        return new IAggregatorDescriptor (){

+

+            private PositionReference position = new PositionReference();

+

+            @Override

+            public AggregateState createAggregateStates() {

+                return new AggregateState(new ArrayBackedValueStorage());

+            }

+

+            @Override

+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)

+                    throws HyracksDataException {

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;

+                inputVal.reset();

+                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

+                for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

+                    position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(position);

+                }

+            }

+

+            @Override

+            public void reset() {

+                // TODO Auto-generated method stub

+                

+            }

+

+            @Override

+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;

+                int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

+                for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

+                    position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(position);

+                }

+            }

+

+            @Override

+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                throw new IllegalStateException("partial result method should not be called");

+            }

+

+            @Override

+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                DataOutput fieldOutput = tupleBuilder.getDataOutput();

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;

+                try {

+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());

+                    tupleBuilder.addFieldEndOffset();

+                } catch (IOException e) {

+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

+                }

+            }

+

+            @Override

+            public void close() {

+                // TODO Auto-generated method stub

+                

+            }

+            

+        };

+        

+    }

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
new file mode 100644
index 0000000..b83aaf7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+public class MergeReadIDAggregateFactory {
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
new file mode 100644
index 0000000..d958205
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.driver;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class Driver {
+    public static enum Plan {
+        BUILD_DEBRUJIN_GRAPH,
+        GRAPH_CLEANNING,
+        CONTIGS_GENERATION,
+    }
+
+    private static final String IS_PROFILING = "genomix.driver.profiling";
+    private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private JobGen jobGen;
+    private boolean profiling;
+
+    private int numPartitionPerMachine;
+
+    private IHyracksClientConnection hcc;
+    private Scheduler scheduler;
+
+    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+        try {
+            hcc = new HyracksConnection(ipAddress, port);
+            scheduler = new Scheduler(hcc.getNodeControllerInfos());
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+        this.numPartitionPerMachine = numPartitionPerMachine;
+    }
+
+    public void runJob(GenomixJob job) throws HyracksException {
+        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+    }
+
+    public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+        /** add hadoop configurations */
+        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+        job.addResource(hadoopCore);
+        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+        job.addResource(hadoopMapRed);
+        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+        job.addResource(hadoopHdfs);
+
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
+
+        this.profiling = profiling;
+        try {
+            Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+            LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+            switch (planChoice) {
+                case BUILD_DEBRUJIN_GRAPH:
+                default:
+                    jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+            }
+
+            start = System.currentTimeMillis();
+            runCreate(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification createJob = jobGen.generateJob();
+            execute(createJob);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private void execute(JobSpecification job) throws Exception {
+        job.setUseConnectorPolicyForScheduling(false);
+        JobId jobId = hcc
+                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
+
+    public static void main(String[] args) throws Exception {
+        GenomixJob jobConf = new GenomixJob();
+        String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+        if (otherArgs.length < 4) {
+            System.err.println("Need <serverIP> <port> <input> <output>");
+            System.exit(-1);
+        }
+        String ipAddress = otherArgs[0];
+        int port = Integer.parseInt(otherArgs[1]);
+        int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+        boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+        // FileInputFormat.setInputPaths(job, otherArgs[2]);
+        {
+            Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+            jobConf.set("mapred.input.dir", path.toString());
+
+            Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+            jobConf.set("mapred.output.dir", outputDir.toString());
+        }
+        // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+        Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+        driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
new file mode 100644
index 0000000..66f0d1e
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+public class GenomixJob extends JobConf {
+
+    public static final String JOB_NAME = "genomix";
+
+    /** Kmers length */
+    public static final String KMER_LENGTH = "genomix.kmer";
+    /** Frame Size */
+    public static final String FRAME_SIZE = "genomix.framesize";
+    /** Frame Limit, hyracks need */
+    public static final String FRAME_LIMIT = "genomix.framelimit";
+    /** Table Size, hyracks need */
+    public static final String TABLE_SIZE = "genomix.tablesize";
+    /** Groupby types */
+    public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+    /** Graph outputformat */
+    public static final String OUTPUT_FORMAT = "genomix.graph.output";
+    /** Get reversed Kmer Sequence */
+    public static final String REVERSED_KMER = "genomix.kmer.reversed";
+
+    /** Configurations used by hybrid groupby function in graph build phrase */
+    public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+    public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+    public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+    public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+    public static final int DEFAULT_KMER = 21;
+    public static final int DEFAULT_FRAME_SIZE = 32768;
+    public static final int DEFAULT_FRAME_LIMIT = 4096;
+    public static final int DEFAULT_TABLE_SIZE = 10485767;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+    public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+    public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+    public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+
+    public static final boolean DEFAULT_REVERSED = false;
+
+    public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
+    public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+
+    public GenomixJob() throws IOException {
+        super(new Configuration());
+    }
+
+    public GenomixJob(Configuration conf) throws IOException {
+        super(conf);
+    }
+
+    /**
+     * Set the kmer length
+     * 
+     * @param the
+     *            desired frame size
+     */
+    final public void setKmerLength(int kmerlength) {
+        setInt(KMER_LENGTH, kmerlength);
+    }
+
+    final public void setFrameSize(int frameSize) {
+        setInt(FRAME_SIZE, frameSize);
+    }
+
+    final public void setFrameLimit(int frameLimit) {
+        setInt(FRAME_LIMIT, frameLimit);
+    }
+
+    final public void setTableSize(int tableSize) {
+        setInt(TABLE_SIZE, tableSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
new file mode 100644
index 0000000..3563f93
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public abstract class JobGen {
+
+    protected final Configuration conf;
+    protected final GenomixJob genomixJob;
+    protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+    public JobGen(GenomixJob job) {
+        this.conf = job;
+        this.genomixJob = job;
+        this.initJobConfiguration();
+    }
+
+    protected abstract void initJobConfiguration();
+
+    public abstract JobSpecification generateJob() throws HyracksException;
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..6eb5bac
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenBrujinGraph extends JobGen {
+    public enum GroupbyType {
+        EXTERNAL,
+        PRECLUSTER,
+        HYBRIDHASH,
+    }
+
+    public enum OutputFormat {
+        TEXT,
+        BINARY,
+    }
+
+    JobConf job;
+    private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    private Scheduler scheduler;
+    private String[] ncNodeNames;
+
+    private int kmers;
+    private int frameLimits;
+    private int frameSize;
+    private int tableSize;
+    private GroupbyType groupbyType;
+    private OutputFormat outputFormat;
+    private boolean bGenerateReversedKmer;
+
+    private AbstractOperatorDescriptor singleGrouper;
+    private IConnectorDescriptor connPartition;
+    private AbstractOperatorDescriptor crossGrouper;
+    private RecordDescriptor readOutputRec;
+    private RecordDescriptor combineOutputRec;
+
+    /** works for hybrid hashing */
+    private long inputSizeInRawRecords;
+    private long inputSizeInUniqueKeys;
+    private int recordSizeInBytes;
+    private int hashfuncStartLevel;
+
+    private void logDebug(String status) {
+        LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+    }
+
+    public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) {
+        super(job);
+        this.scheduler = scheduler;
+        String[] nodes = new String[ncMap.size()];
+        ncMap.keySet().toArray(nodes);
+        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+        for (int i = 0; i < numPartitionPerMachine; i++) {
+            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+        }
+        logDebug("initialize");
+    }
+
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields,
+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                        .of(KmerPointable.FACTORY) }), tableSize), true);
+    }
+
+    private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
+            long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
+            IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
+        return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
+                inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
+                new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec, true);
+    }
+
+    private void generateKmerAggeragateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+        switch (groupbyType) {
+            case EXTERNAL:
+                singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
+                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+                crossGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+                break;
+            case PRECLUSTER:
+            default:
+                singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
+                connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                        new KmerHashPartitioncomputerFactory(), keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
+                crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                        new MergeKmerAggregateFactory(), combineOutputRec);
+                break;
+            case HYBRIDHASH:
+                singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+                        recordSizeInBytes, hashfuncStartLevel, new AggregateKmerAggregateFactory());
+                connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+
+                crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+                        recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+                break;
+        }
+    }
+
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
+
+            InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
+
+            LOG.info("HDFS read into " + splits.length + " splits");
+            String[] readSchedule = scheduler.getLocationConstraints(splits);
+            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
+                    new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private AbstractOperatorDescriptor newGroupByReadOperator(JobSpecification jobSpec, RecordDescriptor nodeOutputRec) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+                null, null});
+        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        jobSpec.setFrameSize(frameSize);
+
+        // File input
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("ReadKmer Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
+
+        generateKmerAggeragateDescriptorbyType(jobSpec);
+        logDebug("LocalKmerGroupby Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
+
+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+
+        logDebug("CrossKmerGroupby Operator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
+        jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+
+        logDebug("Map Kmer to Read Operator");
+        //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,Kmer,{OtherReadID,...}) 
+        RecordDescriptor mapToReadOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                null, null,null, null });
+        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, mapToReadOutputRec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, mapKmerToRead, ncNodeNames);
+        IConnectorDescriptor mapReadConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(mapReadConn, crossGrouper, 0, mapKmerToRead, 0);
+
+        logDebug("Group by Read Operator");
+        // (ReadID,PosInRead,Kmer,{OtherReadID,...})
+        RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                null, null, null, null });
+        AbstractOperatorDescriptor groupbyReadOperator = newGroupByReadOperator(jobSpec,nodeOutputRec);
+        IConnectorDescriptor readPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new ReadIDPartitionComputerFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, groupbyReadOperator, ncNodeNames);
+        jobSpec.connect(readPartition, mapKmerToRead, 0, groupbyReadOperator, 0);
+
+        // Output
+        ITupleWriterFactory writer = null;
+        switch (outputFormat) {
+            case TEXT:
+                writer = new KMerTextWriterFactory(kmers);
+                break;
+            case BINARY:
+            default:
+                writer = new KMerSequenceWriterFactory(job);
+                break;
+        }
+        HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
+
+        logDebug("WriteOperator");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
+
+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(printConn, groupbyReadOperator, 0, writeOperator, 0);
+        jobSpec.addRoot(writeOperator);
+
+        if (groupbyType == GroupbyType.PRECLUSTER) {
+            jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        }
+        return jobSpec;
+    }
+
+ 
+
+    @Override
+    protected void initJobConfiguration() {
+
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        if (kmers % 2 == 0) {
+            kmers--;
+            conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+        }
+        frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+        inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+        inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+        hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+        /** here read the different recordSize why ? */
+        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
+
+        bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
+
+        String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+        if (type.equalsIgnoreCase("external")) {
+            groupbyType = GroupbyType.EXTERNAL;
+        } else if (type.equalsIgnoreCase("precluster")) {
+            groupbyType = GroupbyType.PRECLUSTER;
+        } else {
+            groupbyType = GroupbyType.HYBRIDHASH;
+        }
+
+        String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+        if (output.equalsIgnoreCase("text")) {
+            outputFormat = OutputFormat.TEXT;
+        } else {
+            outputFormat = OutputFormat.BINARY;
+        }
+        job = new JobConf(conf);
+        LOG.info("Genomix Graph Build Configuration");
+        LOG.info("Kmer:" + kmers);
+        LOG.info("Groupby type:" + type);
+        LOG.info("Output format:" + output);
+        LOG.info("Frame limit" + frameLimits);
+        LOG.info("Frame size" + frameSize);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
new file mode 100644
index 0000000..540bdaf
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.util.ByteComparatorFactory;
+import edu.uci.ics.genomix.hyracks.util.StatCountAggregateFactory;
+import edu.uci.ics.genomix.hyracks.util.StatReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.util.StatSumAggregateFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenStatistic extends JobGen {
+    private int kmers;
+    private JobConf hadoopjob;
+    private RecordDescriptor readOutputRec;
+    private String[] ncNodeNames;
+    private Scheduler scheduler;
+    private RecordDescriptor combineOutputRec;
+
+    public JobGenStatistic(GenomixJob job) {
+        super(job);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    protected void initJobConfiguration() {
+        // TODO Auto-generated method stub
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        hadoopjob = new JobConf(conf);
+        hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+        int[] degreeFields = { 0, 1 }; // indegree, outdegree
+        int[] countFields = { 2 };
+        JobSpecification jobSpec = new JobSpecification();
+        /** specify the record fields after read */
+        readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+                ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        /** the reader */
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
+
+        /** the combiner aggregator */
+        AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
+        AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
+
+        /** the final aggregator */
+        AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
+        AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
+
+        /** writer */
+        AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
+        AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
+        jobSpec.addRoot(writeDegree);
+        jobSpec.addRoot(writeCount);
+        return jobSpec;
+    }
+
+    private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
+
+            InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
+
+            String[] readSchedule = scheduler.getLocationConstraints(splits);
+            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
+                    new StatReadsKeyValueParserFactory());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+            IAggregatorDescriptorFactory aggeragater) {
+        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
+                new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
+                aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                                new ByteComparatorFactory(), new ByteComparatorFactory() }),
+                        GenomixJob.DEFAULT_TABLE_SIZE), true);
+    }
+
+    private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
+            HDFSReadOperatorDescriptor readOperator) {
+        AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
+                new StatCountAggregateFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+        return localAggregator;
+    }
+
+    private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
+            AbstractOperatorDescriptor localAggregator) {
+        AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
+        // only need one reducer
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
+                % ncNodeNames.length]);
+        IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                new ITuplePartitionComputerFactory() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITuplePartitionComputer createPartitioner() {
+                        return new ITuplePartitionComputer() {
+                            @Override
+                            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                                    throws HyracksDataException {
+                                return 0;
+                            }
+                        };
+                    }
+                }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
+        jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+        return finalAggregator;
+    }
+
+    private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
+            AbstractOperatorDescriptor finalAggregator) {
+        LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
+                % ncNodeNames.length]);
+
+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+        jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+        return writeOperator;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
new file mode 100644
index 0000000..b070b56
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        return new IBinaryComparator() {
+
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                return b1[s1] - b2[s2];
+            }
+
+        };
+    }
+
+    @Override
+    public IBinaryHashFunction createBinaryHashFunction() {
+        return new IBinaryHashFunction() {
+
+            @Override
+            public int hash(byte[] bytes, int offset, int length) {
+                return bytes[offset];
+            }
+
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
new file mode 100644
index 0000000..f483a9c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+    public class CountAggregator implements IAggregatorDescriptor {
+        private final int[] keyFields;
+
+        public CountAggregator(int[] keyFields) {
+            this.keyFields = keyFields;
+        }
+
+        @Override
+        public AggregateState createAggregateStates() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                throws HyracksDataException {
+            int count = 1;
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when initializing the aggregator.");
+            }
+        }
+
+        @Override
+        public void reset() {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                int stateTupleIndex, AggregateState state) throws HyracksDataException {
+            int count = 1;
+
+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+            byte[] data = stateAccessor.getBuffer().array();
+            count += IntegerSerializerDeserializer.getInt(data, countoffset);
+            IntegerSerializerDeserializer.putInt(count, data, countoffset);
+        }
+
+        @Override
+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+            }
+
+        }
+
+        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+            int tupleOffset = accessor.getTupleStartOffset(tIndex);
+            int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
+            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+            byte[] data = accessor.getBuffer().array();
+
+            return IntegerSerializerDeserializer.getInt(data, countoffset);
+        }
+
+        @Override
+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            outputPartialResult(tupleBuilder, accessor, tIndex, state);
+        }
+
+        @Override
+        public void close() {
+            // TODO Auto-generated method stub
+
+        }
+
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new CountAggregator(keyFields);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..c01aae1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+            throws HyracksDataException {
+
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputBuffer, true);
+
+        return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
+
+            @Override
+            public void open(IFrameWriter writer) throws HyracksDataException {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
+                    throws HyracksDataException {
+                byte adjMap = value.getAdjBitMap();
+                byte count = value.getCount();
+                InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+
+            private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
+                try {
+                    tupleBuilder.reset();
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
+                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
+
+                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputBuffer, writer);
+                        outputAppender.reset(outputBuffer, true);
+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                tupleBuilder.getSize())) {
+                            throw new IllegalStateException(
+                                    "Failed to copy an record into a frame: the record size is too large.");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
new file mode 100644
index 0000000..fb37056
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+    public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
+
+        private final int[] keyFields;
+
+        public DistributeAggregatorDescriptor(int[] keyFields) {
+            this.keyFields = keyFields;
+        }
+
+        @Override
+        public AggregateState createAggregateStates() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+            int tupleOffset = accessor.getTupleStartOffset(tIndex);
+            int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+            int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+            byte[] data = accessor.getBuffer().array();
+            return IntegerSerializerDeserializer.getInt(data, countoffset);
+        }
+
+        @Override
+        public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when initializing the aggregator.");
+            }
+        }
+
+        @Override
+        public void reset() {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                int stateTupleIndex, AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+
+            int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+            int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+            int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+            byte[] data = stateAccessor.getBuffer().array();
+            count += IntegerSerializerDeserializer.getInt(data, countoffset);
+            IntegerSerializerDeserializer.putInt(count, data, countoffset);
+        }
+
+        @Override
+        public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            int count = getCount(accessor, tIndex);
+            DataOutput fieldOutput = tupleBuilder.getDataOutput();
+            try {
+                fieldOutput.writeInt(count);
+                tupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+            }
+
+        }
+
+        @Override
+        public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                AggregateState state) throws HyracksDataException {
+            outputPartialResult(tupleBuilder, accessor, tIndex, state);
+
+        }
+
+        @Override
+        public void close() {
+            // TODO Auto-generated method stub
+
+        }
+
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new DistributeAggregatorDescriptor(keyFields);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 847272a..4a279d6 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -39,9 +39,9 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerCountValue;