finish design of NodeReferenc API
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
new file mode 100644
index 0000000..42638a9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+
+public class NodeReference {
+    private PositionReference nodeID;
+    private int countOfKmer;
+    private PositionListReference incomingList;
+    private PositionListReference outgoingList;
+
+    public NodeReference() {
+        nodeID = new PositionReference();
+        countOfKmer = 0;
+        incomingList = new PositionListReference();
+        outgoingList = new PositionListReference();
+    }
+    
+    public int getCount(){
+        return countOfKmer;
+    }
+
+    public void setCount(int count) {
+        this.countOfKmer = count;
+    }
+
+    public void setNodeID(PositionReference ref) {
+        this.setNodeID(ref.getReadID(), ref.getPosInRead());
+    }
+
+    public void setNodeID(int readID, byte posInRead) {
+        nodeID.set(readID, posInRead);
+    }
+
+    public void setIncomingList(PositionListReference incoming) {
+        incomingList.set(incoming);
+    }
+
+    public void setOutgoingList(PositionListReference outgoing) {
+        outgoingList.set(outgoing);
+    }
+
+    public void reset() {
+        nodeID.set(0, (byte) 0);
+        incomingList.reset();
+        outgoingList.reset();
+        countOfKmer = 0;
+    }
+
+    public PositionListReference getIncomingList() {
+        return incomingList;
+    }
+
+    public PositionListReference getOutgoingList() {
+        return outgoingList;
+    }
+
+    public PositionReference getNodeID() {
+        return nodeID;
+    }
+
+    public void mergeNextWithinOneRead(NodeReference nextNodeEntry) {
+        this.countOfKmer += nextNodeEntry.countOfKmer;
+        for(PositionReference pos : nextNodeEntry.getOutgoingList()){
+            this.outgoingList.append(pos);
+        }
+    }
+
+    public void set(NodeReference node) {
+        this.nodeID.set(node.getNodeID().getReadID(), node.getNodeID().getPosInRead());
+        this.countOfKmer = node.countOfKmer;
+        this.incomingList.set(node.getIncomingList());
+        this.outgoingList.set(node.getOutgoingList());
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 0fa1489..895c644 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -3,24 +3,37 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.data.std.api.IValueReference;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
-public class PositionListReference implements Writable, IValueReference {
-    private byte[] values;
+public class PositionListReference implements Writable, Iterable<PositionReference>, IValueReference {
+    private byte[] storage;
+    private int offset;
     private int valueCount;
     private static final byte[] EMPTY = {};
 
     private PositionReference posIter = new PositionReference();
 
     public PositionListReference() {
-        this.values = EMPTY;
+        this.storage = EMPTY;
         this.valueCount = 0;
+        this.offset = 0;
+    }
+    
+    public PositionListReference(int count, byte [] data, int offset){
+        setNewReference(count, data, offset);
     }
 
+    public void setNewReference(int count, byte[] data, int offset){
+        this.valueCount = count;
+        this.storage = data;
+        this.offset = offset;
+    }
+    
     protected void setSize(int size) {
         if (size > getCapacity()) {
             setCapacity((size * 3 / 2));
@@ -28,16 +41,17 @@
     }
 
     protected int getCapacity() {
-        return values.length;
+        return storage.length - offset;
     }
 
     protected void setCapacity(int new_cap) {
         if (new_cap > getCapacity()) {
             byte[] new_data = new byte[new_cap];
-            if (values.length > 0) {
-                System.arraycopy(values, 0, new_data, 0, values.length);
+            if (storage.length - offset > 0) {
+                System.arraycopy(storage, offset, new_data, 0, storage.length-offset);
             }
-            values = new_data;
+            storage = new_data;
+            offset = 0;
         }
     }
 
@@ -45,32 +59,56 @@
     public void readFields(DataInput in) throws IOException {
         this.valueCount = in.readInt();
         setSize(valueCount * PositionReference.LENGTH);
-        in.readFully(values, 0, valueCount * PositionReference.LENGTH);
+        in.readFully(storage, offset, valueCount * PositionReference.LENGTH);
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeInt(valueCount);
-        out.write(values, 0, valueCount * PositionReference.LENGTH);
+        out.write(storage, offset, valueCount * PositionReference.LENGTH);
     }
 
     public PositionReference getPosition(int i) {
         if (i >= valueCount) {
-            throw new ArrayIndexOutOfBoundsException("Not so much positions");
+            throw new ArrayIndexOutOfBoundsException("No such positions");
         }
-        posIter.setNewSpace(values, i * PositionReference.LENGTH);
+        posIter.setNewReference(storage, offset + i * PositionReference.LENGTH);
         return posIter;
     }
+    
+    @Override
+    public Iterator<PositionReference> iterator() {
+        Iterator<PositionReference> it = new Iterator<PositionReference>() {
+
+            private int currentIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return currentIndex < valueCount;
+            }
+
+            @Override
+            public PositionReference next() {
+                return getPosition(currentIndex);
+            }
+
+            @Override
+            public void remove() {
+                // TODO Auto-generated method stub
+            }
+        };
+        return it;
+    }
 
     public void set(PositionListReference list2) {
-        set(list2.valueCount, list2.values, 0);
+        set(list2.valueCount, list2.storage, list2.offset);
     }
 
     public void set(int valueCount, byte[] newData, int offset) {
         this.valueCount = valueCount;
         setSize(valueCount * PositionReference.LENGTH);
         if (valueCount > 0) {
-            System.arraycopy(newData, offset, values, 0, valueCount * PositionReference.LENGTH);
+            System.arraycopy(newData, offset, storage, this.offset, valueCount * PositionReference.LENGTH);
         }
     }
 
@@ -80,35 +118,35 @@
 
     public void append(PositionReference pos) {
         setSize((1 + valueCount) * PositionReference.LENGTH);
-        System.arraycopy(pos.getByteArray(), pos.getStartOffset(), values, valueCount * PositionReference.LENGTH,
+        System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount * PositionReference.LENGTH,
                 pos.getLength());
         valueCount += 1;
     }
 
     public void append(int readID, byte posInRead) {
         setSize((1 + valueCount) * PositionReference.LENGTH);
-        IntegerSerializerDeserializer.putInt(readID, values, valueCount * PositionReference.LENGTH);
-        values[valueCount * PositionReference.LENGTH + PositionReference.INTBYTES] = posInRead;
+        IntegerSerializerDeserializer.putInt(readID, storage, offset + valueCount * PositionReference.LENGTH);
+        storage[offset + valueCount * PositionReference.LENGTH + PositionReference.INTBYTES] = posInRead;
         valueCount += 1;
     }
 
-    public int getPositionCount() {
+    public int getCountOfPosition() {
         return valueCount;
     }
 
     @Override
     public byte[] getByteArray() {
-        return values;
+        return storage;
     }
 
     @Override
     public int getStartOffset() {
-        return 0;
+        return offset;
     }
 
     @Override
     public int getLength() {
-        return valueCount * PositionReference.LENGTH;
+        return valueCount * PositionReference.LENGTH ;
     }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
index b3feaa5..100b74d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -21,13 +21,12 @@
     }
 
     public PositionReference(byte[] storage, int offset) {
-        setNewSpace(storage, offset);
+        setNewReference(storage, offset);
     }
 
     public PositionReference(int readID, byte posInRead) {
         this();
-        IntegerSerializerDeserializer.putInt(readID, storage, offset);
-        storage[offset + INTBYTES] = posInRead;
+        set(readID, posInRead);
     }
 
     public void set(int readID, byte posInRead) {
@@ -35,7 +34,7 @@
         storage[offset + INTBYTES] = posInRead;
     }
 
-    public void setNewSpace(byte[] storage, int offset) {
+    public void setNewReference(byte[] storage, int offset) {
         this.storage = storage;
         this.offset = offset;
     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 924f455..fe72a81 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -25,9 +25,16 @@
     }
 
     private static final long serialVersionUID = 1L;
+    public static final int InputKmerField = 0;
+    public static final int InputPosListField = 1;
+
+    public static final int OutputReadIDField = 0;
+    public static final int OutputPosInReadField = 1;
+    public static final int OutputOtherReadIDListField = 2;
+    public static final int OutputKmerField = 3; // may not needed
 
     /**
-     * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,*Kmer*,{OtherReadID,...})
+     * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherReadID,...},*Kmer*)
      * OtherReadID appears only when otherReadID.otherPos==0
      */
     public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -40,14 +47,6 @@
         private ArrayTupleBuilder builder;
         private FrameTupleAppender appender;
 
-        public static final int InputKmerField = 0;
-        public static final int InputPosListField = 1;
-
-        public static final int OutputReadIDField = 0;
-        public static final int OutputPosInReadField = 1;
-        public static final int OutputOtherReadIDListField = 2;
-        public static final int OutputKmerField = 3; // may not needed
-
         private PositionReference positionEntry;
         private ArrayBackedValueStorage posListEntry;
         private ArrayBackedValueStorage zeroPositionCollection;
@@ -81,7 +80,7 @@
             int tupleCount = accessor.getTupleCount();
             for (int i = 0; i < tupleCount; i++) {
                 scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
-                writeTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+                scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
             }
         }
 
@@ -90,12 +89,10 @@
             zeroPositionCollection2.reset();
             noneZeroPositionCollection2.reset();
             byte[] data = accessor.getBuffer().array();
-            //Kmer, {(ReadID,PosInRead),...}
-            //  to ReadID, PosInRead, Kmer, {OtherReadID}
             int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                     + accessor.getFieldStartOffset(tIndex, InputPosListField);
             for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
-                positionEntry.setNewSpace(data, offsetPoslist + i);
+                positionEntry.setNewReference(data, offsetPoslist + i);
                 if (positionEntry.getPosInRead() == 0) {
                     zeroPositionCollection2.append(positionEntry);
                 } else {
@@ -105,13 +102,13 @@
 
         }
 
-        private void writeTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+        private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
                 ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
             byte[] data = accessor.getBuffer().array();
             int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                     + accessor.getFieldStartOffset(tIndex, InputPosListField);
             for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
-                positionEntry.setNewSpace(data, offsetPoslist + i);
+                positionEntry.setNewReference(data, offsetPoslist + i);
                 if (positionEntry.getPosInRead() != 0) {
                     appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
                 } else {
@@ -136,7 +133,7 @@
                 int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                         + accessor.getFieldStartOffset(tIndex, InputKmerField);
                 builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
-                
+
                 if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
                     FrameUtils.flushFrame(writeBuffer, writer);
                     appender.reset(writeBuffer, true);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
new file mode 100644
index 0000000..4921da1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -0,0 +1,142 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapReadToNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+    public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = outRecDesc;
+    }
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public static final int InputReadIDField = 0;
+    public static final int InputInfoFieldStart = 1;
+
+    public static final int OutputNodeIDField = 0;
+    public static final int OutputCountOfKmerField = 1;
+    public static final int OutputIncomingField = 2;
+    public static final int OutputOutgoingField = 3;
+    public static final int OutputKmerBytesField = 4;
+
+    /**
+     * (ReadID, Storage[posInRead]={PositionList,Kmer})
+     * to Position, LengthCount, InComingPosList, OutgoingPosList, Kmer
+     */
+    public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        private final IHyracksTaskContext ctx;
+        private final RecordDescriptor inputRecDesc;
+        private final RecordDescriptor outputRecDesc;
+
+        private FrameTupleAccessor accessor;
+        private ByteBuffer writeBuffer;
+        private ArrayTupleBuilder builder;
+        private FrameTupleAppender appender;
+
+        private NodeReference curNodeEntry;
+        private NodeReference nextNodeEntry;
+
+        public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+                RecordDescriptor outputRecDesc) {
+            this.ctx = ctx;
+            this.inputRecDesc = inputRecDesc;
+            this.outputRecDesc = outputRecDesc;
+            curNodeEntry = new NodeReference();
+            nextNodeEntry = new NodeReference();
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+            writeBuffer = ctx.allocateFrame();
+            builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+            appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(writeBuffer, true);
+            writer.open();
+            curNodeEntry.reset();
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            accessor.reset(buffer);
+            int tupleCount = accessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                generateNodeFromRead(i);
+            }
+        }
+
+        private void generateNodeFromRead(int tIndex) {
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+            resetNode(curNodeEntry, offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart));
+
+            for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
+                setNodeRef(nextNodeEntry, offsetPoslist + accessor.getFieldStartOffset(tIndex, i));
+                if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
+                    curNodeEntry.mergeNextWithinOneRead(nextNodeEntry);
+                } else {
+                    curNodeEntry.setOutgoingList(nextNodeEntry.getOutgoingList());
+                    curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
+                    outputNode(curNodeEntry);
+                    nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+                    curNodeEntry.set( nextNodeEntry);
+                }
+            }
+            outputNode(curNodeEntry);
+        }
+
+        private void outputNode(NodeReference node) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        private void setNodeRef(NodeReference node, int i) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        private void resetNode(NodeReference node, int i) {
+            
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+        }
+
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+                recordDescriptors[0]);
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index cb9db16..c7552ca 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -1,76 +1,136 @@
 package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
 
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory {
 
     /**
      * 
      */
     private static final long serialVersionUID = 1L;
+    public static final int InputReadIDField = MapKmerPositionToReadOperator.OutputReadIDField;
+    public static final int InputPosInReadField = MapKmerPositionToReadOperator.OutputPosInReadField;
+    public static final int InputPositionListField = MapKmerPositionToReadOperator.OutputOtherReadIDListField;
+    public static final int InputKmerField = MapKmerPositionToReadOperator.OutputKmerField;
 
+    public static final int OutputReadIDField = 0;
+    public static final int OutputPositionListField = 1;
+
+    public AggregateReadIDAggregateFactory() {
+    }
+
+    /**
+     * (ReadID,PosInRead,{OtherPosition,...},Kmer) to
+     * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+     */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
-        // TODO Auto-generated method stub
-        return new IAggregatorDescriptor(){
+        return new IAggregatorDescriptor() {
+
+            protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+                return offset;
+            }
+
+            protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+                return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+                        getOffSet(accessor, tIndex, fieldId));
+            }
 
             @Override
             public AggregateState createAggregateStates() {
-                // TODO Auto-generated method stub
-                return null;
+                return new AggregateState(new ArrayBackedValueStorage());
             }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+                storage.reset();
+                DataOutput out = storage.getDataOutput();
+                byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+                try {
+                    out.writeByte(posInRead);
+                    writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+                    writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write into temporary storage");
+                }
+
+            }
+
+            private void writeBytesToStorage(DataOutput out, IFrameTupleAccessor accessor, int tIndex, int idField)
+                    throws IOException {
+                int len = accessor.getFieldLength(tIndex, idField);
+                out.writeInt(len);
+                out.write(accessor.getBuffer().array(), getOffSet(accessor, tIndex, idField), len);
             }
 
             @Override
             public void reset() {
                 // TODO Auto-generated method stub
-                
+
             }
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+                DataOutput out = storage.getDataOutput();
+                byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+                try {
+                    out.writeByte(posInRead);
+                    writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+                    writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write into temporary storage");
+                }
             }
 
             @Override
             public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                throw new IllegalStateException("partial result method should not be called");
             }
 
             @Override
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+                try {
+                    fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+                    tupleBuilder.addFieldEndOffset();
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
             }
 
             @Override
             public void close() {
                 // TODO Auto-generated method stub
-                
+
             }
-            
+
         };
     }
-    
-
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
index ec2b47c..b98db56 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -38,7 +38,7 @@
             throws HyracksDataException {

         return new IAggregatorDescriptor (){

 

-            private PositionReference position = new PositionReference();

+            private PositionReference positionReEntry = new PositionReference();

 

             @Override

             public AggregateState createAggregateStates() {

@@ -52,8 +52,8 @@
                 inputVal.reset();

                 int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

                 for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

-                    position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);

-                    inputVal.append(position);

+                    positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(positionReEntry);

                 }

             }

 

@@ -69,8 +69,8 @@
                 ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;

                 int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

                 for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

-                    position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);

-                    inputVal.append(position);

+                    positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(positionReEntry);

                 }

             }

 

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index a20b60d..2877ee6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -1,75 +1,173 @@
 package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
 
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.Pair;
+
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
 
     /**
      * 
      */
     private static final long serialVersionUID = 1L;
 
+    private final int ValidPosCount;
+
+    public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+        ValidPosCount = getPositionCount(readLength, kmerLength);
+    }
+
+    public static int getPositionCount(int readLength, int kmerLength){
+        return readLength - kmerLength + 1;
+    }
+    public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+    public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+
+    public static final int BYTE_SIZE = 1;
+    public static final int INTEGER_SIZE = 4;
+
+    /**
+     * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to
+     * Aggregate as 
+     * (ReadID, Storage[posInRead]={PositionList,Kmer})
+     * 
+     */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
-        // TODO Auto-generated method stub
-        return new IAggregatorDescriptor(){
+        return new IAggregatorDescriptor() {
 
             @Override
             public AggregateState createAggregateStates() {
-                // TODO Auto-generated method stub
-                return null;
+                ArrayBackedValueStorage[] storages = new ArrayBackedValueStorage[ValidPosCount];
+                for (int i = 0; i < storages.length; i++) {
+                    storages[i] = new ArrayBackedValueStorage();
+                }
+                return new AggregateState(Pair.of(storages, 0));
             }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                @SuppressWarnings("unchecked")
+                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+                ArrayBackedValueStorage[] storages = pair.getLeft();
+                for (ArrayBackedValueStorage each : storages) {
+                    each.reset();
+                }
+                int count = 0;
+
+                int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+                ByteBuffer fieldBuffer = accessor.getBuffer();
+
+                while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+                    byte posInRead = fieldBuffer.get(fieldOffset);
+                    if (storages[posInRead].getLength() > 0) {
+                        throw new IllegalArgumentException("Reentering into an exist storage");
+                    }
+                    fieldOffset += BYTE_SIZE;
+                    // read poslist
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    // read Kmer
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    count++;
+                }
+                pair.setValue(count);
+            }
+
+            private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
+                    throws HyracksDataException {
+                int lengthPosList = fieldBuffer.getInt(fieldOffset);
+                try {
+                    storage.getDataOutput().writeInt(lengthPosList);
+                    fieldOffset += INTEGER_SIZE;
+                    storage.getDataOutput().write(fieldBuffer.array(), fieldOffset, lengthPosList);
+                } catch (IOException e) {
+                    throw new HyracksDataException("Failed to write into temporary storage");
+                }
+                return lengthPosList + INTEGER_SIZE;
             }
 
             @Override
             public void reset() {
                 // TODO Auto-generated method stub
-                
+
             }
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                @SuppressWarnings("unchecked")
+                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+                ArrayBackedValueStorage[] storages = pair.getLeft();
+                int count = pair.getRight();
+
+                int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+                ByteBuffer fieldBuffer = accessor.getBuffer();
+
+                while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+                    byte posInRead = fieldBuffer.get(fieldOffset);
+                    if (storages[posInRead].getLength() > 0) {
+                        throw new IllegalArgumentException("Reentering into an exist storage");
+                    }
+                    fieldOffset += BYTE_SIZE;
+                    // read poslist
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    // read Kmer
+                    fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+                    count++;
+                }
+                pair.setValue(count);
             }
 
             @Override
             public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                throw new IllegalStateException("partial result method should not be called");
             }
 
             @Override
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                // TODO Auto-generated method stub
-                
+                @SuppressWarnings("unchecked")
+                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+                ArrayBackedValueStorage[] storages = pair.getLeft();
+                int count = pair.getRight();
+                if (count != storages.length) {
+                    throw new IllegalStateException("Final aggregate position number is invalid");
+                }
+                DataOutput fieldOutput = tupleBuilder.getDataOutput();
+                try {
+                    for (int i = 0; i < storages.length; i++) {
+                        fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(), storages[i].getLength());
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+                }
             }
 
             @Override
             public void close() {
                 // TODO Auto-generated method stub
-                
+
             }
-            
+
         };
     }
-
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 1bd427c..455b5c5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -43,7 +43,7 @@
 
     public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
         this.confFactory = new ConfFactory(conf);
-        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
     }
 
     public class TupleWriter implements ITupleWriter {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index 54e84f7..b8f99ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
 
 import java.io.DataOutput;
 import java.io.IOException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
similarity index 93%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 44b8f55..ee17228 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
 
 import org.apache.hadoop.mapred.JobConf;
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
similarity index 91%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index 21098a9..7f64f28 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hyracks.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
index 66f0d1e..4b0c984 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
@@ -25,7 +25,9 @@
     public static final String JOB_NAME = "genomix";
 
     /** Kmers length */
-    public static final String KMER_LENGTH = "genomix.kmer";
+    public static final String KMER_LENGTH = "genomix.kmerlen";
+    /** Read length */
+    public static final String READ_LENGTH = "genomix.readlen";
     /** Frame Size */
     public static final String FRAME_SIZE = "genomix.framesize";
     /** Frame Limit, hyracks need */
@@ -46,7 +48,8 @@
     public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
     public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
 
-    public static final int DEFAULT_KMER = 21;
+    public static final int DEFAULT_KMERLEN = 21;
+    public static final int DEFAULT_READLEN = 124;
     public static final int DEFAULT_FRAME_SIZE = 32768;
     public static final int DEFAULT_FRAME_LIMIT = 4096;
     public static final int DEFAULT_TABLE_SIZE = 10485767;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 4357d7a..ea5b58f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -28,16 +28,17 @@
 import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
 import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
 import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.hyracks.dataflow.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.KMerTextWriterFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
-import edu.uci.ics.genomix.hyracks.dataflow.NodeSequenceWriterFactory;
-import edu.uci.ics.genomix.hyracks.dataflow.NodeTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
 import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
 import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -86,6 +87,7 @@
     private Scheduler scheduler;
     private String[] ncNodeNames;
 
+    private int readLength;
     private int kmerSize;
     private int frameLimits;
     private int frameSize;
@@ -94,15 +96,6 @@
     private OutputFormat outputFormat;
     private boolean bGenerateReversedKmer;
 
-    /** works for hybrid hashing */
-    private long inputSizeInRawRecords;
-    private long inputSizeInUniqueKeys;
-    private int recordSizeInBytes;
-    private int hashfuncStartLevel;
-    private ExternalGroupOperatorDescriptor readLocalAggregator;
-    private MToNPartitioningConnectorDescriptor readConnPartition;
-    private ExternalGroupOperatorDescriptor readCrossAggregator;
-
     private void logDebug(String status) {
         LOG.debug(status + " nc nodes:" + ncNodeNames.length);
     }
@@ -135,27 +128,28 @@
     private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
             IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
             ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
-            IPointableFactory pointable, RecordDescriptor outRed) throws HyracksDataException {
+            IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+            throws HyracksDataException {
         int[] keyFields = new int[] { 0 }; // the id of grouped key
         Object[] obj = new Object[3];
 
         switch (groupbyType) {
             case EXTERNAL:
                 obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
-                        outRed);
+                        combineRed);
                 obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
                 obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
-                        outRed);
+                        finalRec);
                 break;
             case PRECLUSTER:
             default:
                 obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
-                        outRed);
+                        combineRed);
                 obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
                         new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
                 obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
                         new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
-                        outRed);
+                        finalRec);
                 break;
         }
         return obj;
@@ -197,7 +191,7 @@
 
         Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
                 new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
-                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec);
+                new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
         AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
         logDebug("LocalKmerGroupby Operator");
         connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
@@ -218,18 +212,28 @@
 
         logDebug("Group by Read Operator");
         // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} 
-        RecordDescriptor nodeCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+        RecordDescriptor readIDFinalRec = new RecordDescriptor(
+                new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
         objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
-                new MergeReadIDAggregateFactory(), new ReadIDPartitionComputerFactory(),
-                new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, nodeCombineRec);
+                new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
+                new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
         AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
         connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
 
+        logDebug("Group by ReadID merger");
         IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
         AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
         connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
 
+        logDebug("Map ReadInfo to Node");
+        //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
+        RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
+        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+
         // Output Kmer
         ITupleWriterFactory kmerWriter = null;
         ITupleWriterFactory nodeWriter = null;
@@ -252,10 +256,10 @@
 
         // Output Node
         HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
-        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeNodeOperator, ncNodeNames,
+        connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
         jobSpec.addRoot(writeNodeOperator);
-        
+
         if (groupbyType == GroupbyType.PRECLUSTER) {
             jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         }
@@ -264,8 +268,8 @@
 
     @Override
     protected void initJobConfiguration() {
-
-        kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
+        kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
         if (kmerSize % 2 == 0) {
             kmerSize--;
             conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
@@ -273,17 +277,6 @@
         frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
         tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
         frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
-        inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
-        inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
-        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
-        hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
-        /** here read the different recordSize why ? */
-        recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
-                GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
 
         bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
index 540bdaf..240d2ee 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
@@ -65,7 +65,7 @@
     @Override
     protected void initJobConfiguration() {
         // TODO Auto-generated method stub
-        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
         hadoopjob = new JobConf(conf);
         hadoopjob.setInputFormat(SequenceFileInputFormat.class);
     }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 4a279d6..ba9aea2 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -212,7 +212,7 @@
 
                 //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                 KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
-                        GenomixJob.DEFAULT_KMER));
+                        GenomixJob.DEFAULT_KMERLEN));
                 KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
 
                 while (reader.next(key, value)) {