reformat JobGen to avoid NonSerializable exception
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index 5cf44ba..0b0dcf2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -4,7 +4,12 @@
 
 public class NodeReference extends NodeWritable {
 
-    public NodeReference(int kmerSize) {
+    /**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+	public NodeReference(int kmerSize) {
         super(kmerSize);
         // TODO Auto-generated constructor stub
     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 76b10d0..98b6ca1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -5,4 +5,9 @@
 
 public class PositionListReference extends PositionListWritable implements  IValueReference {
 
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
index 026758a..1e22529 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -5,4 +5,9 @@
 
 public class PositionReference extends PositionWritable implements IValueReference {
 
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index fe72a81..09c0d2e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -5,6 +5,7 @@
 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -17,157 +18,186 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
-public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
+public class MapKmerPositionToReadOperator extends
+		AbstractSingleActivityOperatorDescriptor {
 
-    public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
-        super(spec, 1, 1);
-        recordDescriptors[0] = recDesc;
-    }
+	public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec,
+			RecordDescriptor recDesc) {
+		super(spec, 1, 1);
+		recordDescriptors[0] = recDesc;
+	}
 
-    private static final long serialVersionUID = 1L;
-    public static final int InputKmerField = 0;
-    public static final int InputPosListField = 1;
+	private static final long serialVersionUID = 1L;
+	public static final int InputKmerField = 0;
+	public static final int InputPosListField = 1;
 
-    public static final int OutputReadIDField = 0;
-    public static final int OutputPosInReadField = 1;
-    public static final int OutputOtherReadIDListField = 2;
-    public static final int OutputKmerField = 3; // may not needed
+	public static final int OutputReadIDField = 0;
+	public static final int OutputPosInReadField = 1;
+	public static final int OutputOtherReadIDListField = 2;
+	public static final int OutputKmerField = 3; // may not needed
 
-    /**
-     * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherReadID,...},*Kmer*)
-     * OtherReadID appears only when otherReadID.otherPos==0
-     */
-    public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-        private final IHyracksTaskContext ctx;
-        private final RecordDescriptor inputRecDesc;
-        private final RecordDescriptor outputRecDesc;
+	public static final RecordDescriptor readIDOutputRec = new RecordDescriptor(
+			new ISerializerDeserializer[] { null, null, null, null });
 
-        private FrameTupleAccessor accessor;
-        private ByteBuffer writeBuffer;
-        private ArrayTupleBuilder builder;
-        private FrameTupleAppender appender;
+	/**
+	 * Map (Kmer, {(ReadID,PosInRead),...}) into
+	 * (ReadID,PosInRead,{OtherReadID,...},*Kmer*) OtherReadID appears only when
+	 * otherReadID.otherPos==0
+	 */
+	public class MapKmerPositionToReadNodePushable extends
+			AbstractUnaryInputUnaryOutputOperatorNodePushable {
+		private final IHyracksTaskContext ctx;
+		private final RecordDescriptor inputRecDesc;
+		private final RecordDescriptor outputRecDesc;
 
-        private PositionReference positionEntry;
-        private ArrayBackedValueStorage posListEntry;
-        private ArrayBackedValueStorage zeroPositionCollection;
-        private ArrayBackedValueStorage noneZeroPositionCollection;
+		private FrameTupleAccessor accessor;
+		private ByteBuffer writeBuffer;
+		private ArrayTupleBuilder builder;
+		private FrameTupleAppender appender;
 
-        public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
-                RecordDescriptor outputRecDesc) {
-            this.ctx = ctx;
-            this.inputRecDesc = inputRecDesc;
-            this.outputRecDesc = outputRecDesc;
-            this.positionEntry = new PositionReference();
-            this.posListEntry = new ArrayBackedValueStorage();
-            this.zeroPositionCollection = new ArrayBackedValueStorage();
-            this.noneZeroPositionCollection = new ArrayBackedValueStorage();
-        }
+		private PositionReference positionEntry;
+		private ArrayBackedValueStorage posListEntry;
+		private ArrayBackedValueStorage zeroPositionCollection;
+		private ArrayBackedValueStorage noneZeroPositionCollection;
 
-        @Override
-        public void open() throws HyracksDataException {
-            accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
-            writeBuffer = ctx.allocateFrame();
-            builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
-            appender = new FrameTupleAppender(ctx.getFrameSize());
-            appender.reset(writeBuffer, true);
-            writer.open();
-            posListEntry.reset();
-        }
+		public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx,
+				RecordDescriptor inputRecDesc, RecordDescriptor outputRecDesc) {
+			this.ctx = ctx;
+			this.inputRecDesc = inputRecDesc;
+			this.outputRecDesc = outputRecDesc;
+			this.positionEntry = new PositionReference();
+			this.posListEntry = new ArrayBackedValueStorage();
+			this.zeroPositionCollection = new ArrayBackedValueStorage();
+			this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+		}
 
-        @Override
-        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            accessor.reset(buffer);
-            int tupleCount = accessor.getTupleCount();
-            for (int i = 0; i < tupleCount; i++) {
-                scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
-                scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
-            }
-        }
+		@Override
+		public void open() throws HyracksDataException {
+			accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+			writeBuffer = ctx.allocateFrame();
+			builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+			appender = new FrameTupleAppender(ctx.getFrameSize());
+			appender.reset(writeBuffer, true);
+			writer.open();
+			posListEntry.reset();
+		}
 
-        private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
-                ArrayBackedValueStorage noneZeroPositionCollection2) {
-            zeroPositionCollection2.reset();
-            noneZeroPositionCollection2.reset();
-            byte[] data = accessor.getBuffer().array();
-            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
-            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
-                positionEntry.setNewReference(data, offsetPoslist + i);
-                if (positionEntry.getPosInRead() == 0) {
-                    zeroPositionCollection2.append(positionEntry);
-                } else {
-                    noneZeroPositionCollection2.append(positionEntry);
-                }
-            }
+		@Override
+		public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+			accessor.reset(buffer);
+			int tupleCount = accessor.getTupleCount();
+			for (int i = 0; i < tupleCount; i++) {
+				scanPosition(i, zeroPositionCollection,
+						noneZeroPositionCollection);
+				scanAgainToOutputTuple(i, zeroPositionCollection,
+						noneZeroPositionCollection, builder);
+			}
+		}
 
-        }
+		private void scanPosition(int tIndex,
+				ArrayBackedValueStorage zeroPositionCollection2,
+				ArrayBackedValueStorage noneZeroPositionCollection2) {
+			zeroPositionCollection2.reset();
+			noneZeroPositionCollection2.reset();
+			byte[] data = accessor.getBuffer().array();
+			int offsetPoslist = accessor.getTupleStartOffset(tIndex)
+					+ accessor.getFieldSlotsLength()
+					+ accessor.getFieldStartOffset(tIndex, InputPosListField);
+			for (int i = 0; i < accessor.getFieldLength(tIndex,
+					InputPosListField); i += PositionReference.LENGTH) {
+				positionEntry.setNewReference(data, offsetPoslist + i);
+				if (positionEntry.getPosInRead() == 0) {
+					zeroPositionCollection2.append(positionEntry);
+				} else {
+					noneZeroPositionCollection2.append(positionEntry);
+				}
+			}
 
-        private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
-                ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
-            byte[] data = accessor.getBuffer().array();
-            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
-            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
-                positionEntry.setNewReference(data, offsetPoslist + i);
-                if (positionEntry.getPosInRead() != 0) {
-                    appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
-                } else {
-                    appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
-                }
-            }
-        }
+		}
 
-        private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
-                ArrayTupleBuilder builder2) {
-            try {
-                builder2.addField(pos.getByteArray(), 0, PositionReference.INTBYTES);
-                builder2.addField(pos.getByteArray(), PositionReference.INTBYTES, 1);
-                //? ask Yingyi, if support empty bytes[]
-                if (posList2 == null) {
-                    builder2.addFieldEndOffset();
-                } else {
-                    builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
-                }
-                // set kmer, may not useful
-                byte[] data = accessor.getBuffer().array();
-                int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
-                        + accessor.getFieldStartOffset(tIndex, InputKmerField);
-                builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+		private void scanAgainToOutputTuple(int tIndex,
+				ArrayBackedValueStorage zeroPositionCollection,
+				ArrayBackedValueStorage noneZeroPositionCollection,
+				ArrayTupleBuilder builder2) {
+			byte[] data = accessor.getBuffer().array();
+			int offsetPoslist = accessor.getTupleStartOffset(tIndex)
+					+ accessor.getFieldSlotsLength()
+					+ accessor.getFieldStartOffset(tIndex, InputPosListField);
+			for (int i = 0; i < accessor.getFieldLength(tIndex,
+					InputPosListField); i += PositionReference.LENGTH) {
+				positionEntry.setNewReference(data, offsetPoslist + i);
+				if (positionEntry.getPosInRead() != 0) {
+					appendNodeToBuilder(tIndex, positionEntry,
+							zeroPositionCollection, builder2);
+				} else {
+					appendNodeToBuilder(tIndex, positionEntry,
+							noneZeroPositionCollection, builder2);
+				}
+			}
+		}
 
-                if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
-                    FrameUtils.flushFrame(writeBuffer, writer);
-                    appender.reset(writeBuffer, true);
-                    if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
-                        throw new IllegalStateException();
-                    }
-                }
-                builder2.reset();
-            } catch (HyracksDataException e) {
-                throw new IllegalStateException(
-                        "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
-            }
-        }
+		private void appendNodeToBuilder(int tIndex, PositionReference pos,
+				ArrayBackedValueStorage posList2, ArrayTupleBuilder builder2) {
+			try {
+				builder2.addField(pos.getByteArray(), 0,
+						PositionReference.INTBYTES);
+				builder2.addField(pos.getByteArray(),
+						PositionReference.INTBYTES, 1);
+				// ? ask Yingyi, if support empty bytes[]
+				if (posList2 == null) {
+					builder2.addFieldEndOffset();
+				} else {
+					builder2.addField(posList2.getByteArray(),
+							posList2.getStartOffset(), posList2.getLength());
+				}
+				// set kmer, may not useful
+				byte[] data = accessor.getBuffer().array();
+				int offsetKmer = accessor.getTupleStartOffset(tIndex)
+						+ accessor.getFieldSlotsLength()
+						+ accessor.getFieldStartOffset(tIndex, InputKmerField);
+				builder2.addField(data, offsetKmer,
+						accessor.getFieldLength(tIndex, InputKmerField));
 
-        @Override
-        public void fail() throws HyracksDataException {
-            writer.fail();
-        }
+				if (!appender.append(builder2.getFieldEndOffsets(),
+						builder2.getByteArray(), 0, builder2.getSize())) {
+					FrameUtils.flushFrame(writeBuffer, writer);
+					appender.reset(writeBuffer, true);
+					if (!appender.append(builder2.getFieldEndOffsets(),
+							builder2.getByteArray(), 0, builder2.getSize())) {
+						throw new IllegalStateException();
+					}
+				}
+				builder2.reset();
+			} catch (HyracksDataException e) {
+				throw new IllegalStateException(
+						"Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+			}
+		}
 
-        @Override
-        public void close() throws HyracksDataException {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-            }
-            writer.close();
-        }
+		@Override
+		public void fail() throws HyracksDataException {
+			writer.fail();
+		}
 
-    }
+		@Override
+		public void close() throws HyracksDataException {
+			if (appender.getTupleCount() > 0) {
+				FrameUtils.flushFrame(writeBuffer, writer);
+			}
+			writer.close();
+		}
 
-    @Override
-    public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
-                0), recordDescriptors[0]);
-    }
+	}
+
+	@Override
+	public AbstractOperatorNodePushable createPushRuntime(
+			IHyracksTaskContext ctx,
+			IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) {
+		return new MapKmerPositionToReadNodePushable(
+				ctx,
+				recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+				recordDescriptors[0]);
+	}
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 7dc4947..bfc5ae5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -3,7 +3,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -11,7 +10,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -37,16 +35,6 @@
                 return offset;
             }
 
-            protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
-                return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
-                        getOffSet(accessor, tIndex, fieldId));
-            }
-
-            protected int readIntField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
-                return IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
-                        getOffSet(accessor, tIndex, fieldId));
-            }
-
             @Override
             public void reset() {
             }
@@ -67,7 +55,7 @@
                     AggregateState state) throws HyracksDataException {
                 ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
                 inputVal.reset();
-                position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
                 inputVal.append(position);
             }
 
@@ -75,7 +63,7 @@
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
                 ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
-                position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
                 inputVal.append(position);
             }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 2877ee6..d3971fb 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,8 +4,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.commons.lang3.tuple.Pair;
-
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -16,39 +14,57 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
+public class MergeReadIDAggregateFactory implements
+		IAggregatorDescriptorFactory {
 
-    /**
+	/**
      * 
      */
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    private final int ValidPosCount;
+	private final int ValidPosCount;
 
-    public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
-        ValidPosCount = getPositionCount(readLength, kmerLength);
-    }
+	public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+		ValidPosCount = getPositionCount(readLength, kmerLength);
+	}
 
-    public static int getPositionCount(int readLength, int kmerLength){
-        return readLength - kmerLength + 1;
-    }
-    public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
-    public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+	public static int getPositionCount(int readLength, int kmerLength) {
+		return readLength - kmerLength + 1;
+	}
 
-    public static final int BYTE_SIZE = 1;
-    public static final int INTEGER_SIZE = 4;
+	public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+	public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
 
-    /**
-     * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to
-     * Aggregate as 
-     * (ReadID, Storage[posInRead]={PositionList,Kmer})
-     * 
-     */
-    @Override
+	public static final int BYTE_SIZE = 1;
+	public static final int INTEGER_SIZE = 4;
+
+	/**
+	 * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to Aggregate as
+	 * (ReadID, Storage[posInRead]={PositionList,Kmer})
+	 * 
+	 */
+	@Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
         return new IAggregatorDescriptor() {
+        	
+        	class PositionArray{
+        		public ArrayBackedValueStorage[] storages;
+        		public int count;
+        		
+        		public PositionArray(ArrayBackedValueStorage[] storages2, int i){
+        			storages = storages2;
+        			count = i;
+        		}
+        		
+        		public void reset(){
+        			for (ArrayBackedValueStorage each : storages) {
+                        each.reset();
+                    }
+                    count = 0;
+        		}
+        	}
 
             @Override
             public AggregateState createAggregateStates() {
@@ -56,19 +72,15 @@
                 for (int i = 0; i < storages.length; i++) {
                     storages[i] = new ArrayBackedValueStorage();
                 }
-                return new AggregateState(Pair.of(storages, 0));
+                return new AggregateState(new PositionArray(storages,0));
             }
 
             @Override
             public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                @SuppressWarnings("unchecked")
-                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
-                ArrayBackedValueStorage[] storages = pair.getLeft();
-                for (ArrayBackedValueStorage each : storages) {
-                    each.reset();
-                }
-                int count = 0;
+                PositionArray positionArray = (PositionArray) state.state;
+                positionArray.reset();
+                ArrayBackedValueStorage[] storages = positionArray.storages;
 
                 int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                         + accessor.getFieldStartOffset(tIndex, InputPositionListField);
@@ -84,9 +96,8 @@
                     fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
                     // read Kmer
                     fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
-                    count++;
+                    positionArray.count++;
                 }
-                pair.setValue(count);
             }
 
             private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
@@ -111,10 +122,8 @@
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
-                @SuppressWarnings("unchecked")
-                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
-                ArrayBackedValueStorage[] storages = pair.getLeft();
-                int count = pair.getRight();
+                PositionArray positionArray = (PositionArray) state.state;
+                ArrayBackedValueStorage[] storages = positionArray.storages;
 
                 int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                         + accessor.getFieldStartOffset(tIndex, InputPositionListField);
@@ -130,9 +139,8 @@
                     fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
                     // read Kmer
                     fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
-                    count++;
+                    positionArray.count++;
                 }
-                pair.setValue(count);
             }
 
             @Override
@@ -144,11 +152,9 @@
             @Override
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                @SuppressWarnings("unchecked")
-                Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
-                ArrayBackedValueStorage[] storages = pair.getLeft();
-                int count = pair.getRight();
-                if (count != storages.length) {
+            	PositionArray positionArray = (PositionArray) state.state;
+                ArrayBackedValueStorage[] storages = positionArray.storages;
+                if (positionArray.count != storages.length) {
                     throw new IllegalStateException("Final aggregate position number is invalid");
                 }
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 0508ba7..981b2de 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -24,7 +24,7 @@
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
@@ -47,7 +47,7 @@
 
     public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
         this.confFactory = new ConfFactory(conf);
-        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+        this.kmerlength = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
     }
 
     public class TupleWriter implements ITupleWriter {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 7de3b54..684d01e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -11,7 +11,7 @@
 
 import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -40,7 +40,7 @@
 
     public NodeSequenceWriterFactory(JobConf conf) throws HyracksDataException {
         this.confFactory = new ConfFactory(conf);
-        this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+        this.kmerlength = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
     }
 
     public class TupleWriter implements ITupleWriter {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index 68a83ac..5014a8a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -24,7 +24,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.GenericOptionsParser;
 
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 import edu.uci.ics.genomix.hyracks.job.JobGen;
 import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
 import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
@@ -70,11 +70,11 @@
         this.numPartitionPerMachine = numPartitionPerMachine;
     }
 
-    public void runJob(GenomixJob job) throws HyracksException {
+    public void runJob(GenomixJobConf job) throws HyracksException {
         runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
     }
 
-    public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+    public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
         /** add hadoop configurations */
         URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
         job.addResource(hadoopCore);
@@ -99,12 +99,16 @@
                     break;
                 case OUTPUT_KMERHASHTABLE:
                     jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
                 case OUTPUT_MAP_KMER_TO_READ:
                     jobGen = new JobGenMapKmerToRead(job,scheduler, ncMap, numPartitionPerMachine);
+                    break;
                 case OUTPUT_GROUPBY_READID:
                     jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
                 case CHECK_KMERREADER:
                     jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
             }
 
             start = System.currentTimeMillis();
@@ -136,7 +140,7 @@
     }
 
     public static void main(String[] args) throws Exception {
-        GenomixJob jobConf = new GenomixJob();
+        GenomixJobConf jobConf = new GenomixJobConf();
         String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
         if (otherArgs.length < 4) {
             System.err.println("Need <serverIP> <port> <input> <output>");
@@ -152,6 +156,7 @@
             Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
             jobConf.set("mapred.input.dir", path.toString());
 
+            @SuppressWarnings("deprecation")
             Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
             jobConf.set("mapred.output.dir", outputDir.toString());
         }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
similarity index 84%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
index 4b0c984..fe9afdf 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -20,7 +20,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 
-public class GenomixJob extends JobConf {
+public class GenomixJobConf extends JobConf {
 
     public static final String JOB_NAME = "genomix";
 
@@ -61,14 +61,21 @@
 
     public static final boolean DEFAULT_REVERSED = false;
 
-    public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
-    public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+    public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
+    public static final String JOB_PLAN_GRAPHSTAT  = "graphstat";
+    
+    public static final String GROUPBY_TYPE_HYBRID = "hybrid";
+    public static final String GROUPBY_TYPE_EXTERNAL = "external";
+    public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
+    public static final String OUTPUT_FORMAT_BINARY = "binary";
+    public static final String OUTPUT_FORMAT_TEXT = "text";
 
-    public GenomixJob() throws IOException {
+
+    public GenomixJobConf() throws IOException {
         super(new Configuration());
     }
 
-    public GenomixJob(Configuration conf) throws IOException {
+    public GenomixJobConf(Configuration conf) throws IOException {
         super(conf);
     }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
index d17d9ef..a9dfc9b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -18,14 +18,14 @@
 import java.io.Serializable;
 import java.util.UUID;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 
-public abstract class JobGen implements Serializable{
+public abstract class JobGen implements Serializable {
 
     /**
      * 
@@ -34,13 +34,9 @@
     protected final ConfFactory confFactory;
     protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
 
-    public JobGen(GenomixJob job) throws HyracksDataException {
+    public JobGen(GenomixJobConf job) throws HyracksDataException {
         this.confFactory = new ConfFactory(job);
-        this.initJobConfiguration();
     }
 
-    protected abstract void initJobConfiguration()throws HyracksDataException ;
-
     public abstract JobSpecification generateJob() throws HyracksException;
-
 }
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 4adddb6..2f40337 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.genomix.hyracks.job;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -91,8 +92,8 @@
 
     protected ConfFactory hadoopJobConfFactory;
     protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-    protected Scheduler scheduler;
     protected String[] ncNodeNames;
+    protected String[] readSchedule;
 
     protected int readLength;
     protected int kmerSize;
@@ -107,17 +108,16 @@
         LOG.debug(status + " nc nodes:" + ncNodeNames.length);
     }
 
-    public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+    public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
             int numPartitionPerMachine) throws HyracksDataException {
         super(job);
-        this.scheduler = scheduler;
         String[] nodes = new String[ncMap.size()];
         ncMap.keySet().toArray(nodes);
         ncNodeNames = new String[nodes.length * numPartitionPerMachine];
         for (int i = 0; i < numPartitionPerMachine; i++) {
             System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
         }
-        logDebug("initialize");
+        initJobConfiguration(scheduler);
     }
 
     private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
@@ -165,12 +165,9 @@
 
     public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
         try {
-
             InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
                     .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
 
-            LOG.info("HDFS read into " + splits.length + " splits");
-            String[] readSchedule = scheduler.getLocationConstraints(splits);
             return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
                     hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(kmerSize,
                             bGenerateReversedKmer));
@@ -209,10 +206,11 @@
 
     public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
             AbstractOperatorDescriptor kmerCrossAggregator) {
-        //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer) 
-        RecordDescriptor readIDOutputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] { null, null, null, null });
-        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
+        // Map (Kmer, {(ReadID,PosInRead),...}) into
+        // (ReadID,PosInRead,{OtherPosition,...},Kmer)
+
+        AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
+                MapKmerPositionToReadOperator.readIDOutputRec);
         connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
         return mapKmerToRead;
@@ -220,7 +218,7 @@
 
     public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
             AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
-        // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} 
+        // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
         RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
         RecordDescriptor readIDFinalRec = new RecordDescriptor(
                 new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
@@ -240,7 +238,8 @@
 
     public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
             AbstractOperatorDescriptor readCrossAggregator) {
-        //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
+        // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
+        // OutgoingList, Kmer)
         RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
         AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec, kmerSize);
         connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
@@ -248,7 +247,7 @@
         return mapEachReadToNode;
     }
 
-    public AbstractOperatorDescriptor generateRootByWriteKmerGroupbyResult(JobSpecification jobSpec,
+    public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
             AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
         // Output Kmer
         ITupleWriterFactory kmerWriter = null;
@@ -266,11 +265,11 @@
                 hadoopJobConfFactory.getConf(), kmerWriter);
         connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
-        jobSpec.addRoot(writeKmerOperator);
+        // jobSpec.addRoot(writeKmerOperator);
         return writeKmerOperator;
     }
 
-    public AbstractOperatorDescriptor generateRootByWriteNodeResult(JobSpecification jobSpec,
+    public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
             AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
         ITupleWriterFactory nodeWriter = null;
         switch (outputFormat) {
@@ -288,7 +287,6 @@
                 hadoopJobConfFactory.getConf(), nodeWriter);
         connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
-        jobSpec.addRoot(writeNodeOperator);
         return writeNodeOperator;
     }
 
@@ -303,8 +301,8 @@
         logDebug("Group by Kmer");
         AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
 
-        logDebug("Write kmer to result");
-        generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+        // logDebug("Write kmer to result");
+        // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
 
         logDebug("Map Kmer to Read Operator");
         lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
@@ -316,28 +314,27 @@
         lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
 
         logDebug("Write node to result");
-        generateRootByWriteNodeResult(jobSpec, lastOperator);
+        lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
 
+        jobSpec.addRoot(lastOperator);
         return jobSpec;
     }
 
-    @SuppressWarnings("deprecation")
-    @Override
-    protected void initJobConfiguration() throws HyracksDataException {
+    protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
         Configuration conf = confFactory.getConf();
-        readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
-        kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+        readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
+        kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
         if (kmerSize % 2 == 0) {
             kmerSize--;
-            conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
+            conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
         }
-        frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
-        tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
-        frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
 
-        bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
+        bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
 
-        String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+        String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         if (type.equalsIgnoreCase("external")) {
             groupbyType = GroupbyType.EXTERNAL;
         } else if (type.equalsIgnoreCase("precluster")) {
@@ -346,13 +343,22 @@
             groupbyType = GroupbyType.HYBRIDHASH;
         }
 
-        String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
         if (output.equalsIgnoreCase("text")) {
             outputFormat = OutputFormat.TEXT;
         } else {
             outputFormat = OutputFormat.BINARY;
         }
-        hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+        try {
+            hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+            @SuppressWarnings("deprecation")
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+            readSchedule = scheduler.getLocationConstraints(splits);
+        } catch (IOException ex) {
+            throw new HyracksDataException(ex);
+        }
+
         LOG.info("Genomix Graph Build Configuration");
         LOG.info("Kmer:" + kmerSize);
         LOG.info("Groupby type:" + type);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index a18c280..c6a69d4 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -28,7 +28,7 @@
      */
     private static final long serialVersionUID = 1L;
 
-    public JobGenCheckReader(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+    public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
             int numPartitionPerMachine) throws HyracksDataException {
         super(job, scheduler, ncMap, numPartitionPerMachine);
         // TODO Auto-generated constructor stub
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
index 2c1a70a..4ee36a0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
@@ -17,7 +17,7 @@
      */
     private static final long serialVersionUID = 1L;
 
-    public JobGenCreateKmerInfo(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+    public JobGenCreateKmerInfo(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
             int numPartitionPerMachine) throws HyracksDataException {
         super(job, scheduler, ncMap, numPartitionPerMachine);
         // TODO Auto-generated constructor stub
@@ -34,7 +34,8 @@
         AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
 
         logDebug("Write kmer to result");
-        generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+        lastOperator = generateKmerWritorOperator(jobSpec, lastOperator);
+        jobSpec.addRoot(lastOperator);
 
         return jobSpec;
     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index 7649100..8a11379 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -21,7 +21,7 @@
      */
     private static final long serialVersionUID = 1L;
 
-    public JobGenGroupbyReadID(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+    public JobGenGroupbyReadID(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
             int numPartitionPerMachine) throws HyracksDataException {
         super(job, scheduler, ncMap, numPartitionPerMachine);
         // TODO Auto-generated constructor stub
@@ -37,8 +37,8 @@
         logDebug("Group by Kmer");
         AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
 
-        logDebug("Write kmer to result");
-        generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+        //logDebug("Write kmer to result");
+        //generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
 
         logDebug("Map Kmer to Read Operator");
         lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 76499e6..4fa554f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -21,7 +21,7 @@
      */
     private static final long serialVersionUID = 1L;
 
-    public JobGenMapKmerToRead(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+    public JobGenMapKmerToRead(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
             int numPartitionPerMachine) throws HyracksDataException {
         super(job, scheduler, ncMap, numPartitionPerMachine);
         // TODO Auto-generated constructor stub
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
index ae2838d..216f631 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
@@ -40,7 +40,7 @@
 
 import edu.uci.ics.genomix.hyracks.driver.Driver;
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 
 public class JobKmerGroupbyTest {
@@ -76,7 +76,7 @@
         FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
         FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
 
-        conf.setInt(GenomixJob.KMER_LENGTH, 5);
+        conf.setInt(GenomixJobConf.KMER_LENGTH, 5);
         driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
                 edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
     }
@@ -130,7 +130,7 @@
         cleanUpReEntry();
         TestHybridGroupby();
         cleanUpReEntry();
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
         TestExternalReversedGroupby();
         cleanUpReEntry();
         TestPreClusterReversedGroupby();
@@ -139,53 +139,53 @@
     }
 
     public void TestExternalGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "external");
+        conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
         System.err.println("Testing ExternalGroupBy");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
         Assert.assertEquals(true, checkResults(EXPECTED_PATH));
     }
 
     public void TestPreClusterGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
+        conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
         System.err.println("Testing PreClusterGroupBy");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
         Assert.assertEquals(true, checkResults(EXPECTED_PATH));
     }
 
     public void TestHybridGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
+        conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
         System.err.println("Testing HybridGroupBy");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
         Assert.assertEquals(true, checkResults(EXPECTED_PATH));
     }
 
     public void TestExternalReversedGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "external");
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, "external");
+        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
         System.err.println("Testing ExternalGroupBy + Reversed");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
         Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
     }
 
     public void TestPreClusterReversedGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, "precluster");
+        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
         System.err.println("Testing PreclusterGroupBy + Reversed");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
         Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
     }
 
     public void TestHybridReversedGroupby() throws Exception {
-        conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
-        conf.setBoolean(GenomixJob.REVERSED_KMER, true);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, "hybrid");
+        conf.setBoolean(GenomixJobConf.REVERSED_KMER, true);
         System.err.println("Testing HybridGroupBy + Reversed");
-        driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
         Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
     }
 
     private boolean checkResults(String expectedPath) throws Exception {
         File dumped = null;
-        String format = conf.get(GenomixJob.OUTPUT_FORMAT);
+        String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
         if ("text".equalsIgnoreCase(format)) {
             FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
                     FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
@@ -209,8 +209,8 @@
                 SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
 
                 //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
-                        GenomixJob.DEFAULT_KMERLEN));
+                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH,
+                        GenomixJobConf.DEFAULT_KMERLEN));
 //                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
                 KmerBytesWritable value = null;
                 while (reader.next(key, value)) {
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index fca0607..da74f27 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -25,155 +25,207 @@
 
 import edu.uci.ics.genomix.hyracks.driver.Driver;
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
+@SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
-    private static final int KmerSize = 5;
-    private static final String ACTUAL_RESULT_DIR = "actual";
-    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+	private static final int KmerSize = 5;
+	private static final String ACTUAL_RESULT_DIR = "actual";
+	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-    private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
-    private static final String HDFS_INPUT_PATH = "/webmap";
-    private static final String HDFS_OUTPUT_PATH = "/webmap_result";
-    
-    private static final String EXPECTED_DIR = "src/test/resources/expected/";
-    private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
+	private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
+	private static final String HDFS_INPUT_PATH = "/webmap";
+	private static final String HDFS_OUTPUT_PATH = "/webmap_result";
 
-    private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
-    private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
-    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private MiniDFSCluster dfsCluster;
+	private static final String EXPECTED_DIR = "src/test/resources/expected/";
+	private static final String EXPECTED_READER_RESULT = EXPECTED_DIR
+			+ "result_after_initial_read";
+	private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR
+			+ "result_after_kmerAggregate";
 
-    private JobConf conf = new JobConf();
-    private int numberOfNC = 2;
-    private int numPartitionPerMachine = 1;
+	private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
+			+ HDFS_OUTPUT_PATH + "/merged.txt";
+	private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+			+ File.separator + "conf.xml";
+	private MiniDFSCluster dfsCluster;
 
-    private Driver driver;
-    
-    @Test
-    public void TestAll() throws Exception {
-        TestReader();
-    }
-    
-    public void TestReader() throws Exception{
-        conf.set(GenomixJob.GROUPBY_TYPE, "external");
-        System.err.println("Testing ExternalGroupBy");
-        driver.runJob(new GenomixJob(conf), Plan.CHECK_KMERREADER, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
-    }
+	private JobConf conf = new JobConf();
+	private int numberOfNC = 2;
+	private int numPartitionPerMachine = 1;
 
-    @Before
-    public void setUp() throws Exception {
-        cleanupStores();
-        edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
-        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        startHDFS();
+	private Driver driver;
 
-        FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
-        FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+	@Test
+	public void TestAll() throws Exception {
+		//TestReader();
+		TestGroupbyKmer();
+		// TestMapKmerToRead();
+		// TestGroupByReadID();
+		// TestEndToEnd();
+	}
 
-        conf.setInt(GenomixJob.KMER_LENGTH, KmerSize);
-        driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
-                edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
-    }
-    private void cleanupStores() throws IOException {
-        FileUtils.forceMkdir(new File("teststore"));
-        FileUtils.forceMkdir(new File("build"));
-        FileUtils.cleanDirectory(new File("teststore"));
-        FileUtils.cleanDirectory(new File("build"));
-    }
+	public void TestReader() throws Exception {
+	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+		driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
+		Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
+	}
 
-    private void startHDFS() throws IOException {
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+	public void TestGroupbyKmer() throws Exception {
+	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+		driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
+		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+	}
 
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        lfs.delete(new Path("build"), true);
-        System.setProperty("hadoop.log.dir", "logs");
-        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-        FileSystem dfs = FileSystem.get(conf);
-        Path src = new Path(DATA_INPUT_PATH);
-        Path dest = new Path(HDFS_INPUT_PATH);
-        dfs.mkdirs(dest);
-        //dfs.mkdirs(result);
-        dfs.copyFromLocalFile(src, dest);
+	public void TestMapKmerToRead() throws Exception {
+	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+		driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
+		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+	}
 
-        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
-        conf.writeXml(confOutput);
-        confOutput.flush();
-        confOutput.close();
-    }
+	public void TestGroupByReadID() throws Exception {
+	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+		driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
+		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+	}
 
-    private void cleanUpReEntry() throws IOException {
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        if (lfs.exists(new Path(DUMPED_RESULT))) {
-            lfs.delete(new Path(DUMPED_RESULT), true);
-        }
-        FileSystem dfs = FileSystem.get(conf);
-        if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
-            dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
-        }
-    }
-    
-    private boolean checkResults(String expectedPath) throws Exception {
-        File dumped = null;
-        String format = conf.get(GenomixJob.OUTPUT_FORMAT);
-        if ("text".equalsIgnoreCase(format)) {
-            FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
-                    FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
-            dumped = new File(DUMPED_RESULT);
-        } else {
+	public void TestEndToEnd() throws Exception {
+	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+		cleanUpReEntry();
+		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+		driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+		cleanUpReEntry();
+		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+		driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
+	}
 
-            FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
-            File filePathTo = new File(CONVERT_RESULT);
-            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
-            for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
-                String partname = "/part-" + i;
-                //              FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
-                //                      + partname), FileSystem.getLocal(new Configuration()),
-                //                      new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+	@Before
+	public void setUp() throws Exception {
+		cleanupStores();
+		edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
+		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+		startHDFS();
 
-                Path path = new Path(HDFS_OUTPUT_PATH + partname);
-                FileSystem dfs = FileSystem.get(conf);
-                if (dfs.getFileStatus(path).getLen() == 0) {
-                    continue;
-                }
-                SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
+		FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+		FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
 
-                //                KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
-                        GenomixJob.DEFAULT_KMERLEN));
-//                KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                KmerBytesWritable value = null;
-                while (reader.next(key, value)) {
-                    if (key == null || value == null) {
-                        break;
-                    }
-                    bw.write(key.toString() + "\t" + value.toString());
-                    System.out.println(key.toString() + "\t" + value.toString());
-                    bw.newLine();
-                }
-                reader.close();
-            }
-            bw.close();
-            dumped = new File(CONVERT_RESULT);
-        }
+		conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
+		driver = new Driver(
+				edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
+				edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
+				numPartitionPerMachine);
+	}
 
-        TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
-        return true;
-    }
+	private void cleanupStores() throws IOException {
+		FileUtils.forceMkdir(new File("teststore"));
+		FileUtils.forceMkdir(new File("build"));
+		FileUtils.cleanDirectory(new File("teststore"));
+		FileUtils.cleanDirectory(new File("build"));
+	}
 
-    @After
-    public void tearDown() throws Exception {
-        edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
-        cleanupHDFS();
-    }
+	private void startHDFS() throws IOException {
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
 
-    private void cleanupHDFS() throws Exception {
-        dfsCluster.shutdown();
-    }
+		FileSystem lfs = FileSystem.getLocal(new Configuration());
+		lfs.delete(new Path("build"), true);
+		System.setProperty("hadoop.log.dir", "logs");
+		dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+		FileSystem dfs = FileSystem.get(conf);
+		Path src = new Path(DATA_INPUT_PATH);
+		Path dest = new Path(HDFS_INPUT_PATH);
+		dfs.mkdirs(dest);
+		// dfs.mkdirs(result);
+		dfs.copyFromLocalFile(src, dest);
+
+		DataOutputStream confOutput = new DataOutputStream(
+				new FileOutputStream(new File(HADOOP_CONF_PATH)));
+		conf.writeXml(confOutput);
+		confOutput.flush();
+		confOutput.close();
+	}
+
+	private void cleanUpReEntry() throws IOException {
+		FileSystem lfs = FileSystem.getLocal(new Configuration());
+		if (lfs.exists(new Path(DUMPED_RESULT))) {
+			lfs.delete(new Path(DUMPED_RESULT), true);
+		}
+		FileSystem dfs = FileSystem.get(conf);
+		if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
+			dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
+		}
+	}
+
+	private boolean checkResults(String expectedPath) throws Exception {
+		File dumped = null;
+		String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+		if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
+			FileUtil.copyMerge(FileSystem.get(conf),
+					new Path(HDFS_OUTPUT_PATH), FileSystem
+							.getLocal(new Configuration()), new Path(
+							DUMPED_RESULT), false, conf, null);
+			dumped = new File(DUMPED_RESULT);
+		} else {
+
+			FileSystem.getLocal(new Configuration()).mkdirs(
+					new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
+			File filePathTo = new File(CONVERT_RESULT);
+			BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+			for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
+				String partname = "/part-" + i;
+				// FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+				// + partname), FileSystem.getLocal(new Configuration()),
+				// new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
+				// false, conf);
+
+				Path path = new Path(HDFS_OUTPUT_PATH + partname);
+				FileSystem dfs = FileSystem.get(conf);
+				if (dfs.getFileStatus(path).getLen() == 0) {
+					continue;
+				}
+				SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
+						conf);
+
+				// KmerBytesWritable key = (KmerBytesWritable)
+				// ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+				KmerBytesWritable key = new KmerBytesWritable(conf.getInt(
+						GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN));
+				// KmerCountValue value = (KmerCountValue)
+				// ReflectionUtils.newInstance(reader.getValueClass(), conf);
+				KmerBytesWritable value = null;
+				while (reader.next(key, value)) {
+					if (key == null || value == null) {
+						break;
+					}
+					bw.write(key.toString() + "\t" + value.toString());
+					System.out
+							.println(key.toString() + "\t" + value.toString());
+					bw.newLine();
+				}
+				reader.close();
+			}
+			bw.close();
+			dumped = new File(CONVERT_RESULT);
+		}
+
+		TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+		return true;
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
+		cleanupHDFS();
+	}
+
+	private void cleanupHDFS() throws Exception {
+		dfsCluster.shutdown();
+	}
 }