new package for path merge
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/experiment/Position.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/experiment/Position.java
new file mode 100644
index 0000000..b740a7d
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/experiment/Position.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.experiment;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class Position implements Writable {
+    private int readID;
+    private byte posInRead;
+
+    public Position(int readID, byte posInRead) {
+        set(readID, posInRead);
+    }
+
+    public Position() {
+        readID = -1;
+        posInRead = -1;
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+        readID = arg0.readInt();
+        posInRead = arg0.readByte();
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+        arg0.writeInt(readID);
+        arg0.writeByte(posInRead);
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(readID) + '\t' + String.valueOf(posInRead);
+    }
+
+    public void set(int readID, byte posInRead) {
+        this.readID = readID;
+        this.posInRead = posInRead;
+    }
+
+    public int getReadID() {
+        return readID;
+    }
+
+    public void setReadID(int readID) {
+        this.readID = readID;
+    }
+
+    public byte getPosInRead() {
+        return posInRead;
+    }
+}
\ No newline at end of file
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/experiment/PositionList.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/experiment/PositionList.java
new file mode 100644
index 0000000..651a69c
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/experiment/PositionList.java
@@ -0,0 +1,183 @@
+package edu.uci.ics.genomix.experiment;
+
+public class PositionList {
+
+    private int[] readIDList;
+    private byte[] posInReadList;
+    private static final int[] EMPTY_INTS = {};
+    private static final byte[] EMPTY_BYTES = {};
+
+    private int usedSize;
+    private int arraySize;
+
+    public PositionList() {
+        this(0, 0, EMPTY_INTS, EMPTY_BYTES);
+    }
+
+    public PositionList(int usedSize, int arraySize, int[] rList, byte[] pList) {
+        this.usedSize = usedSize;
+        this.arraySize = arraySize;
+        if (arraySize > 0) {
+            this.readIDList = rList;
+            this.posInReadList = pList;
+            if (this.readIDList.length != arraySize | this.posInReadList.length != arraySize) {
+                throw new ArrayIndexOutOfBoundsException("the arraySize doesn't corespond to the array");
+            }
+            if (this.readIDList.length < usedSize | this.posInReadList.length < usedSize) {
+                throw new ArrayIndexOutOfBoundsException("the usedSize doesn't corespond to the array");
+            }
+        } else {
+            this.readIDList = rList;
+            this.posInReadList = pList;
+            this.arraySize = 0;
+            this.usedSize = 0;
+        }
+    }
+
+    public PositionList(int arraySize) {
+        this.arraySize = arraySize;
+        this.usedSize = 0;
+        if (arraySize > 0) {
+            this.readIDList = new int[this.arraySize];
+            this.posInReadList = new byte[this.arraySize];
+        } else {
+            this.readIDList = EMPTY_INTS;
+            this.posInReadList = EMPTY_BYTES;
+        }
+    }
+
+    public PositionList(PositionList right) {
+        if (right != null) {
+            this.usedSize = right.usedSize;
+            this.arraySize = right.arraySize;
+            this.readIDList = new int[right.arraySize];
+            this.posInReadList = new byte[right.arraySize];
+            if (this.readIDList.length != arraySize | this.posInReadList.length != arraySize) {
+                throw new ArrayIndexOutOfBoundsException("the arraySize doesn't corespond to the array");
+            }
+            if (this.readIDList.length < usedSize | this.posInReadList.length < usedSize) {
+                throw new ArrayIndexOutOfBoundsException("the usedSize doesn't corespond to the array");
+            }
+            set(right);
+        } else {
+            this.arraySize = 0;
+            this.usedSize = 0;
+            this.readIDList = EMPTY_INTS;
+            this.posInReadList = EMPTY_BYTES;
+        }
+    }
+
+    public void set(PositionList newData) {
+        set(newData.readIDList, 0, newData.posInReadList, 0, newData.usedSize);
+    }
+
+    public void set(int[] rIDList, int rOffset, byte[] pReadList, int pOffset, int copySize) {
+        setArraySize(0);
+        setArraySize(copySize);
+        System.arraycopy(rIDList, rOffset, this.readIDList, 0, copySize);
+        System.arraycopy(pReadList, pOffset, this.posInReadList, 0, copySize);
+        this.usedSize = copySize;
+    }
+
+    public void setArraySize(int arraySize) {
+        if (arraySize > getCapacity()) {
+            setCapacity((arraySize * 2));
+        }
+        this.arraySize = arraySize;
+    }
+
+    public int getCapacity() {
+        return this.arraySize;
+    }
+
+    public void setCapacity(int new_cap) {
+        if (new_cap != getCapacity()) {
+            int[] newRList = new int[new_cap];
+            byte[] newPList = new byte[new_cap];
+            if (new_cap < this.arraySize) {
+                this.arraySize = new_cap;
+            }
+            if (this.arraySize != 0) {
+                System.arraycopy(this.readIDList, 0, newRList, 0, this.usedSize);
+                System.arraycopy(this.posInReadList, 0, newPList, 0, this.usedSize);
+            }
+            this.readIDList = newRList;
+            this.posInReadList = newPList;
+        }
+    }
+
+    public int getReadListElement(int position) {
+        if (position < this.usedSize) {
+            return this.readIDList[position];
+        } else {
+            throw new ArrayIndexOutOfBoundsException("position exceed for the usedSize");
+        }
+    }
+
+    public byte getPosinReadListElement(int position) {
+        if (position < this.usedSize) {
+            return this.posInReadList[position];
+        } else {
+            throw new ArrayIndexOutOfBoundsException("position exceed for the usedSize");
+        }
+    }
+
+    public int findReadListElement(int rContent) {
+        for (int i = 0; i < this.usedSize; i++) {
+            if (this.readIDList[i] == rContent)
+                return i;
+        }
+        return -1;
+    }
+
+    public int findPosinReadListElement(int pContent) {
+        for (int i = 0; i < this.usedSize; i++) {
+            if (this.usedSize == pContent)
+                return i;
+        }
+        return -1;
+    }
+
+    public void addELementToList(int rContent, byte pContent) {
+        if (this.usedSize < this.arraySize) {
+            this.readIDList[this.usedSize] = rContent;
+            this.posInReadList[this.usedSize] = pContent;
+            this.usedSize++;
+        } else {
+            setCapacity((this.arraySize * 2));
+            this.readIDList[this.usedSize] = rContent;
+            this.posInReadList[this.usedSize] = pContent;
+            this.usedSize++;
+        }
+    }
+
+    public void deleteElementFromTwoList(int position) {
+        if (position < this.usedSize) {
+            for (int i = position; i < this.usedSize; i++) {
+                this.readIDList[i] = this.readIDList[i + 1];
+                this.posInReadList[i] = this.posInReadList[i + 1];
+            }
+            this.readIDList[this.usedSize - 1] = -1;
+            this.posInReadList[this.usedSize - 1] = (byte) -1;
+            this.usedSize--;
+        } else {
+            throw new ArrayIndexOutOfBoundsException("position exceed for the usedSize");
+        }
+    }
+    
+    public int[] getReadIDList() {
+        return this.readIDList;
+    }
+    
+    public byte[] getPosInReadList() {
+        return this.posInReadList;
+    }
+    
+    public int getUsedSize() {
+        return this.usedSize;
+    }
+    
+    public int getArraySize() {
+        return this.arraySize;
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathMultiSeqOutputFormat.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathMultiSeqOutputFormat.java
new file mode 100644
index 0000000..479d664
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathMultiSeqOutputFormat.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.File;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+public class MergePathMultiSeqOutputFormat extends MultipleSequenceFileOutputFormat<VKmerBytesWritable, MergePathValueWritable>{
+    @Override
+    protected String generateLeafFileName(String name) {
+        // TODO Auto-generated method stub System.out.println(name); 
+        String[] names = name.split("-");
+        return names[0] + File.separator + name;
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathMultiTextOutputFormat.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathMultiTextOutputFormat.java
new file mode 100644
index 0000000..885d512
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathMultiTextOutputFormat.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.File;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
+
+public class MergePathMultiTextOutputFormat extends MultipleTextOutputFormat<Text, Text>{
+    @Override
+    protected String generateLeafFileName(String name) {
+        // TODO Auto-generated method stub System.out.println(name); 
+        String[] names = name.split("-");
+        return names[0] + File.separator + name;
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathValueWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathValueWritable.java
new file mode 100644
index 0000000..31dee7c
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MergePathValueWritable.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+public class MergePathValueWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
+
+    private static final byte[] EMPTY_BYTES = {};
+    private byte adjBitMap;
+    private byte flag;
+    private VKmerBytesWritable kmer;
+
+    public MergePathValueWritable() {
+        this((byte) 0, (byte) 0, 0, EMPTY_BYTES);
+    }
+    
+    public MergePathValueWritable(int k) {
+        this.adjBitMap = 0;
+        this.flag = 0;
+        this.kmer = new VKmerBytesWritable(k);
+    }
+    
+    public MergePathValueWritable(byte adjBitMap, byte flag, int kmerSize, byte[] bytes) {
+        this.adjBitMap = adjBitMap;
+        this.flag = flag;
+        this.kmer = new VKmerBytesWritable(kmerSize, bytes);
+        kmer.set(bytes, 0, bytes.length);
+    }
+
+    public void set(MergePathValueWritable right) {
+        set(right.getAdjBitMap(), right.getFlag(), right.getKmer());
+    }
+
+    public void set(byte adjBitMap, byte flag, VKmerBytesWritable kmer) {
+        this.kmer.set(kmer);
+        this.adjBitMap = adjBitMap;
+        this.flag = flag;
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+        // TODO Auto-generated method stub
+        kmer.readFields(arg0);
+        adjBitMap = arg0.readByte();
+        flag = arg0.readByte();
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+        // TODO Auto-generated method stub
+
+        kmer.write(arg0);
+        arg0.writeByte(adjBitMap);
+        arg0.writeByte(flag);
+    }
+
+    public VKmerBytesWritable getKmer() {
+        if (kmer.getLength() != 0) {
+            return kmer;
+        }
+        return null;
+    }
+
+    public byte getAdjBitMap() {
+        return this.adjBitMap;
+    }
+
+    public byte getFlag() {
+        return this.flag;
+    }
+
+    public String toString() {
+        return GeneCode.getSymbolFromBitMap(adjBitMap) + '\t' + String.valueOf(flag);
+    }
+
+    public String pureToString() {
+        return GeneCode.getSymbolFromBitMap(adjBitMap);
+    }
+    @Override
+    public byte[] getBytes() {
+        // TODO Auto-generated method stub
+        if (kmer.getLength() != 0) {
+            return kmer.getBytes();
+        } else
+            return null;
+
+    }
+
+    public int getKmerLength() {
+        return kmer.getKmerLength();
+    }
+
+    @Override
+    public int getLength() {
+        return kmer.getLength();
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/SNodeInitialMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/SNodeInitialMapper.java
new file mode 100644
index 0000000..3e3790a
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/SNodeInitialMapper.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+@SuppressWarnings("deprecation")
+public class SNodeInitialMapper extends MapReduceBase implements
+        Mapper<KmerBytesWritable, ByteWritable, KmerBytesWritable, MergePathValueWritable> {
+
+    public int KMER_SIZE;
+    public KmerBytesWritable outputKmer;
+    public MergePathValueWritable outputAdjList;
+
+    public void configure(JobConf job) {
+        KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+        outputKmer = new KmerBytesWritable(KMER_SIZE);
+        outputAdjList = new MergePathValueWritable();
+    }
+    
+    /**
+     * @param adjacent the high 4 bits are useless, we just use the lower 4 bits
+     * @return if the degree == 1 then return false, else return true
+     */
+    boolean measureDegree(byte adjacent) {
+        boolean result = true;
+        switch (adjacent) {
+            case 0:
+                result = true;
+                break;
+            case 1:
+                result = false;
+                break;
+            case 2:
+                result = false;
+                break;
+            case 3:
+                result = true;
+                break;
+            case 4:
+                result = false;
+                break;
+            case 5:
+                result = true;
+                break;
+            case 6:
+                result = true;
+                break;
+            case 7:
+                result = true;
+                break;
+            case 8:
+                result = false;
+                break;
+            case 9:
+                result = true;
+                break;
+            case 10:
+                result = true;
+                break;
+            case 11:
+                result = true;
+                break;
+            case 12:
+                result = true;
+                break;
+            case 13:
+                result = true;
+                break;
+            case 14:
+                result = true;
+                break;
+            case 15:
+                result = true;
+                break;
+        }
+        return result;
+    }
+
+    @Override
+    public void map(KmerBytesWritable key, ByteWritable value,
+            OutputCollector<KmerBytesWritable, MergePathValueWritable> output, Reporter reporter) throws IOException {
+        //TODO clean this code piece, use the genomix-data function
+        byte precursor = (byte) 0xF0;
+        byte succeed = (byte) 0x0F;
+        byte adjBitMap = value.get();
+        byte bitFlag = (byte) 0;
+        precursor = (byte) (precursor & adjBitMap);
+        precursor = (byte) ((precursor & 0xff) >> 4);
+        succeed = (byte) (succeed & adjBitMap);
+        boolean inDegree = measureDegree(precursor);
+        boolean outDegree = measureDegree(succeed);
+        //if indegree == 1 and outdegree == 1, then it assigns these records' flag to 2
+        if (inDegree == false && outDegree == false) {
+            outputKmer.set(key);
+            bitFlag = (byte) 0x02;
+            outputAdjList.set(adjBitMap, bitFlag, null);
+            output.collect(outputKmer, outputAdjList);
+        } else {
+            // other records maps its precursor neighbors
+            /**
+             * eg. ACT  CTA|CA, it maps CAC, TAC, AAC, all the 3 pairs marked  0x80
+             */
+            for (int i = 0; i < 4; i++) {
+                byte temp = (byte) 0x01;
+                byte shiftedCode = 0;
+                temp = (byte) (temp << i);
+                temp = (byte) (precursor & temp);   
+                if (temp != 0) {
+                    //TODO use the genomix-data factory function
+                    byte precurCode = GeneCode.getGeneCodeFromBitMap(temp);
+                    shiftedCode = key.shiftKmerWithPreCode(precurCode);
+                    outputKmer.set(key);
+                    bitFlag = (byte) 0x80;
+                    outputAdjList.set((byte) 0, bitFlag, null);
+                    output.collect(outputKmer, outputAdjList);
+                    key.shiftKmerWithNextCode(shiftedCode);
+                }
+            }
+            //and also maps its succeeding neighbors
+            /**
+             * eg. ACT  CTA|CA, it maps CTC, CTA, all the 2 pairs marked 0x01
+             */
+            for (int i = 0; i < 4; i++) {
+                byte temp = (byte) 0x01;
+                byte shiftedCode = 0;
+                temp = (byte) (temp << i);
+                temp = (byte) (succeed & temp);
+                if (temp != 0) {
+                    byte succeedCode = GeneCode.getGeneCodeFromBitMap(temp);
+                    shiftedCode = key.shiftKmerWithNextCode(succeedCode);
+                    outputKmer.set(key);
+                    bitFlag = (byte) 0x01;
+                    outputAdjList.set((byte) 0, bitFlag, null);
+                    output.collect(outputKmer, outputAdjList);
+                    key.shiftKmerWithPreCode(shiftedCode);
+                }
+            }
+        }
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/SNodeInitialReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/SNodeInitialReducer.java
new file mode 100644
index 0000000..69fa985
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/SNodeInitialReducer.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+@SuppressWarnings("deprecation")
+public class SNodeInitialReducer extends MapReduceBase implements
+        Reducer<KmerBytesWritable, MergePathValueWritable, VKmerBytesWritable, MergePathValueWritable> {
+    private VKmerBytesWritable outputKmer = new VKmerBytesWritable();
+    private MergePathValueWritable outputValue = new MergePathValueWritable();
+    MultipleOutputs mos = null;
+
+    public void configure(JobConf job) {
+        mos = new MultipleOutputs(job);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void reduce(KmerBytesWritable key, Iterator<MergePathValueWritable> values,
+            OutputCollector<VKmerBytesWritable, MergePathValueWritable> output, Reporter reporter) throws IOException {
+        outputKmer.set(key);
+        outputValue = values.next();
+        byte startPointFlag = 0x00;
+        byte endPointFlag = 0x00;
+        /**
+         * the targetPoint means that we want find the record which 1 indegree and 1 outdegree in the group which has multi-records
+         */
+        byte targetPointFlag = 0x00;
+        byte targetAdjList = 0x00;
+        //if we find the start or end point, we will use outputFlag to mark them
+        byte outputFlag = 0x00;
+        
+        if (values.hasNext() == true) {
+            //find startPointFlag, endPointFlag, targetPointFlag
+
+            switch (outputValue.getFlag()) {
+                case (byte) 0x01:
+                    startPointFlag = (byte) 0x01;
+                    break;
+                case (byte) 0x80:
+                    endPointFlag = (byte) 0x80;
+                    break;
+                case (byte) 0x02:
+                    targetPointFlag = (byte) 0x02;
+                    targetAdjList = outputValue.getAdjBitMap();
+                    break;
+            }
+            while (values.hasNext()) {
+                outputValue = values.next();
+                switch (outputValue.getFlag()) {
+                    case (byte) 0x01:
+                        startPointFlag = (byte) 0x01;
+                        break;
+                    case (byte) 0x80:
+                        endPointFlag = (byte) 0x80;
+                        break;
+                    case (byte) 0x02:
+                        targetPointFlag = (byte) 0x02;
+                        targetAdjList = outputValue.getAdjBitMap();
+                        break;
+                }
+                if (startPointFlag != (byte) 0x00 && endPointFlag != (byte) 0x00 && targetPointFlag != (byte) 0x00)
+                    break;
+            }
+            //if we find the start-point or end-point
+            if (targetPointFlag == (byte) 0x02) {
+                //remove the single point path
+                if (startPointFlag == (byte) 0x01 && endPointFlag == (byte) 0x80) {
+                    outputFlag = (byte) (outputFlag | startPointFlag);
+                    outputFlag = (byte) (outputFlag | endPointFlag);
+                    outputValue.set(targetAdjList, outputFlag, null);
+                    mos.getCollector("comSinglePath0", reporter).collect(outputKmer, outputValue);
+                } else {
+                    if (startPointFlag == (byte) 0x01) {
+                        outputFlag = (byte) (outputFlag | startPointFlag);
+                    }
+                    if (endPointFlag == (byte) 0x80) {
+                        outputFlag = (byte) (outputFlag | endPointFlag);
+                    }
+                    outputValue.set(targetAdjList, outputFlag, null);
+                    output.collect(outputKmer, outputValue);
+                }
+            }
+        } else {
+            //keep the non-start/end single point into the input files
+            if (outputValue.getFlag() == (byte) 0x02) {
+                byte bitFlag = 0;
+                outputValue.set(outputValue.getAdjBitMap(), bitFlag, null);
+                output.collect(outputKmer, outputValue);
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        // TODO Auto-generated method stub
+        mos.close();
+    }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pathmergingh1/MergePathH1Test.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pathmergingh1/MergePathH1Test.java
new file mode 100644
index 0000000..5f5b40a
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pathmergingh1/MergePathH1Test.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.pathmergingh1;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+import edu.uci.ics.genomix.hadoop.pathmergingh1.MergePathH1Driver;
+import edu.uci.ics.genomix.hadoop.pmcommon.MergePathValueWritable;
+import edu.uci.ics.genomix.hadoop.utils.TestUtils;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+@SuppressWarnings("deprecation")
+public class MergePathH1Test {
+    private static final String ACTUAL_RESULT_DIR = "actual3";
+    private static final String COMPARE_DIR = "compare";
+    private JobConf conf = new JobConf();
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+    private static final String DATA_PATH = "actual2" + "/result2" + "/part-00000";
+    private static final String HDFS_PATH = "/hdfsdata";
+    private static final String HDFS_PATH_MERGED = "/pathmerged";
+    
+    private static final String RESULT_PATH = "/result3";
+//    private static final String EXPECTED_PATH = "expected/result3";
+    private static final String TEST_SOURCE_DIR = COMPARE_DIR + RESULT_PATH;
+    
+    private static final int COUNT_REDUCER = 1;
+    private static final int SIZE_KMER = 3;
+    private static final int MERGE_ROUND = 2;
+    
+    private MiniDFSCluster dfsCluster;
+    private MiniMRCluster mrCluster;
+    private FileSystem dfs;
+
+    @SuppressWarnings("resource")
+    @Test
+    public void test() throws Exception {
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        startHadoop();
+
+        MergePathH1Driver tldriver = new MergePathH1Driver();
+        tldriver.run(HDFS_PATH, RESULT_PATH, HDFS_PATH_MERGED, COUNT_REDUCER, SIZE_KMER, MERGE_ROUND, HADOOP_CONF_PATH);
+        
+        SequenceFile.Reader reader = null;
+        Path path = new Path(HDFS_PATH_MERGED + "/comSinglePath2" + "/comSinglePath2-r-00000");
+        reader = new SequenceFile.Reader(dfs, path, conf);
+        VKmerBytesWritable key = (VKmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        MergePathValueWritable value = (MergePathValueWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        File filePathTo = new File(TEST_SOURCE_DIR);
+        FileUtils.forceMkdir(filePathTo);
+        FileUtils.cleanDirectory(filePathTo);
+        BufferedWriter bw = new BufferedWriter(new FileWriter(new File(TEST_SOURCE_DIR + "/comparesource.txt")));
+        while (reader.next(key, value)) {
+            bw.write(key.toString() + "\t" + value.getAdjBitMap() + "\t" + value.getFlag());
+            bw.newLine();
+        }
+        bw.close();
+        
+        cleanupHadoop();
+
+    }
+    private void startHadoop() throws IOException {
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+        dfs = dfsCluster.getFileSystem();
+        mrCluster = new MiniMRCluster(4, dfs.getUri().toString(), 2);
+
+        Path src = new Path(DATA_PATH);
+        Path dest = new Path(HDFS_PATH + "/");
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(src, dest);
+        Path data = new Path(HDFS_PATH_MERGED + "/");
+        dfs.mkdirs(data);
+   
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+        conf.writeXml(confOutput);
+        confOutput.flush();
+        confOutput.close();
+    }
+
+    private void cleanupHadoop() throws IOException {
+        mrCluster.shutdown();
+        dfsCluster.shutdown();
+    }
+
+    private void dumpResult() throws IOException {
+        Path src = new Path(RESULT_PATH);
+        Path dest = new Path(ACTUAL_RESULT_DIR + "/");
+        dfs.copyToLocalFile(src, dest);
+    }
+}