Merge commit '951870ef4642f2003c4240eeef702fc90600fe77' into nanzhang/genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 11d0998..34fb0f6 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -132,6 +132,9 @@
     
     public static int getCountByDataLength(int length) {
         if (length % PositionWritable.LENGTH != 0) {
+            for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+                System.out.println(ste);
+            }
             throw new IllegalArgumentException("Length of positionlist is invalid");
         }
         return length / PositionWritable.LENGTH;
diff --git a/genomix/genomix-hadoop/pom.xml b/genomix/genomix-hadoop/pom.xml
index fe83c3c..610092a 100755
--- a/genomix/genomix-hadoop/pom.xml
+++ b/genomix/genomix-hadoop/pom.xml
@@ -156,5 +156,23 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>genomix-hyracks</artifactId>
+			<version>0.2.6-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.6-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.6-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
new file mode 100644
index 0000000..d8d4429
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+@SuppressWarnings("deprecation")
+public class TestPathMergeH3 extends GenomixMiniClusterTest {
+    protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+    protected String HDFS_SEQUENCE = "/00-sequence/";
+    protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
+    protected String HDFS_MERGED = "/02-graphmerge/";
+    
+    protected String GRAPHBUILD_FILE = "result.graphbuild.txt";
+    protected String PATHMERGE_FILE = "result.mergepath.txt";
+    
+    {
+        KMER_LENGTH = 5;
+        READ_LENGTH = 8;
+        HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED));
+        // we have to specify what kind of keys and values this job has
+        key = new NodeWritable(KMER_LENGTH);
+        value = NullWritable.get();
+    }
+
+    @Test
+    public void TestBuildGraph() throws Exception {
+        cleanUpOutput();
+        copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
+        buildGraph();
+    }
+
+//    @Test
+    public void TestMergeOneIteration() throws Exception {
+        cleanUpOutput();
+        copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
+        buildGraph();
+        MergePathsH3Driver h3 = new MergePathsH3Driver();
+        h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, ACTUAL_ROOT + "conf.xml", null);
+        copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
+    }
+
+
+
+    public void buildGraph() throws Exception {
+        FileInputFormat.setInputPaths(conf, HDFS_SEQUENCE);
+        FileOutputFormat.setOutputPath(conf, new Path(HDFS_GRAPHBUILD));
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
new file mode 100644
index 0000000..544f5fc
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/GenomixMiniClusterTest.java
@@ -0,0 +1,192 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.junit.After;
+import org.junit.Before;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+/*
+ * A base class providing most of the boilerplate for Genomix-based tests
+ */
+@SuppressWarnings("deprecation")
+public class GenomixMiniClusterTest {
+    protected int KMER_LENGTH = 5;
+    protected int READ_LENGTH = 8;
+    
+    // subclass should modify this to include the HDFS directories that should be cleaned up
+    protected ArrayList<String> HDFS_PATHS = new ArrayList<String>();
+
+    protected String EXPECTED_ROOT = "src/test/resources/expected/";
+    protected String ACTUAL_ROOT = "src/test/resources/actual/";
+    
+    protected String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+    
+    protected MiniDFSCluster dfsCluster;
+    protected JobConf conf = new JobConf();
+    protected int numberOfNC = 1;
+    protected int numPartitionPerMachine = 1;
+    protected Driver driver;
+
+    protected Writable key;
+    protected Writable value;
+    protected MiniMRCluster mrCluster;
+
+
+    @Before
+    public void setUp() throws Exception {
+        cleanupStores();
+        HyracksUtils.init();
+        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+        startHDFS();
+
+        conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+        conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+        driver = new Driver(HyracksUtils.CC_HOST,
+                HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+    }
+    
+    /*
+     * Merge and copy a DFS directory to a local destination, converting to text if necessary. 
+     * Also locally store the binary-formatted result if available.
+     */
+    protected void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
+        String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+        if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
+            // for text files, just concatenate them together
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+                    FileSystem.getLocal(new Configuration()), new Path(localDestFile),
+                    false, conf, null);
+        } else {
+            // file is binary
+            // merge and store the binary format
+//            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+//                    FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
+//                    false, conf, null);
+            // load the Node's and write them out as text locally
+            FileSystem lfs = FileSystem.getLocal(new Configuration());
+            lfs.mkdirs(new Path(localDestFile).getParent());
+            File filePathTo = new File(localDestFile);
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+            FileSystem dfs = FileSystem.get(conf);
+            SequenceFile.Reader reader = new SequenceFile.Reader(dfs, new Path(hdfsSrcDir + "part-0"), conf);
+            SequenceFile.Writer writer = new SequenceFile.Writer(lfs, new JobConf(), new Path(localDestFile + ".bin"), reader.getKeyClass(), reader.getValueClass());
+            
+            for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
+                Path path = new Path(hdfsSrcDir + "part-" + i);
+                if (!dfs.exists(path)) {
+                    break;
+                }
+                if (dfs.getFileStatus(path).getLen() == 0) {
+                    continue;
+                }
+                reader = new SequenceFile.Reader(dfs, path, conf);
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                    writer.append(key, value);
+                    
+                }
+                reader.close();
+            }
+            writer.close();
+            bw.close();
+        }
+
+    }
+    
+    protected boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+        File dumped = new File(actualPath); 
+        if (poslistField != null) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
+
+    protected void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    protected void startHDFS() throws IOException {
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        //mrCluster = new MiniMRCluster(4, dfsCluster.getFileSystem().getUri().toString(), 2);
+        
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(ACTUAL_ROOT + "conf.xml")));
+        conf.writeXml(confOutput);
+        confOutput.close();
+    }
+    
+    protected void copyLocalToDFS(String localSrc, String hdfsDest) throws IOException {
+        FileSystem dfs = FileSystem.get(conf);
+        Path src = new Path(localSrc);
+        Path dest = new Path(hdfsDest);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(src, dest);
+    }
+    
+    /*
+     * Remove the local "actual" folder and any hdfs folders in use by this test
+     */
+    public void cleanUpOutput() throws IOException {
+        // local cleanup
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(ACTUAL_ROOT))) {
+            lfs.delete(new Path(ACTUAL_ROOT), true);
+        }
+        // dfs cleanup
+        FileSystem dfs = FileSystem.get(conf);
+        for (String path : HDFS_PATHS) {
+            if (dfs.exists(new Path(path))) {
+                dfs.delete(new Path(path), true);
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        HyracksUtils.deinit();
+        cleanupHDFS();
+    }
+
+    protected void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+        //mrCluster.shutdown();
+    }
+}
diff --git a/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
new file mode 100755
index 0000000..13190dd
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
@@ -0,0 +1,6 @@
+1	AATAGAAG

+2	AATAGAAG

+3	AATAGAAG

+4	AATAGAAG

+5	AATAGAAG

+6	AGAAGAAG

diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..3e5bacb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+	<property>
+		<name>fs.default.name</name>
+		<value>hdfs://127.0.0.1:31888</value>
+	</property>
+	<property>
+		<name>hadoop.tmp.dir</name>
+		<value>/tmp/hadoop</value>
+	</property>
+
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..b1b1902
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+	<property>
+		<name>dfs.replication</name>
+		<value>1</value>
+	</property>
+
+	<property>
+		<name>dfs.block.size</name>
+		<value>65536</value>
+	</property>
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..525e7d5
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+	<property>
+		<name>mapred.job.tracker</name>
+		<value>localhost:29007</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.map.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.reduce.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.max.split.size</name>
+		<value>2048</value>
+	</property>
+
+</configuration>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index fe8224f..3667d43 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -45,19 +45,17 @@
     public static final int OutputKmerField = 0;
     public static final int OutputPosition = 1;
 
-    private KmerBytesWritable kmer;
-    private PositionReference pos;
-    private boolean bReversed;
+    private final boolean bReversed;
     private final int readLength;
+    private final int kmerSize;
 
     public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
             null });
 
     public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
         bReversed = bGenerateReversed;
-        kmer = new KmerBytesWritable(k);
-        pos = new PositionReference();
         this.readLength = readlength;
+        this.kmerSize = k;
     }
 
     @Override
@@ -69,6 +67,9 @@
 
         return new IKeyValueParser<LongWritable, Text>() {
 
+            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+            private PositionReference pos = new PositionReference();
+            
             @Override
             public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
                 String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
@@ -97,17 +98,16 @@
 
             private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
                 /** first kmer */
-                int k = kmer.getKmerLength();
-                if (k >= array.length) {
+                if (kmerSize >= array.length) {
                     return;
                 }
                 kmer.setByRead(array, 0);
                 InsertToFrame(kmer, readID, 1, writer);
 
                 /** middle kmer */
-                for (int i = k; i < array.length; i++) {
+                for (int i = kmerSize; i < array.length; i++) {
                     kmer.shiftKmerWithNextChar(array[i]);
-                    InsertToFrame(kmer, readID, i - k + 2, writer);
+                    InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
                 }
 
                 if (bReversed) {
@@ -115,9 +115,9 @@
                     kmer.setByReadReverse(array, 0);
                     InsertToFrame(kmer, readID, -1, writer);
                     /** middle kmer */
-                    for (int i = k; i < array.length; i++) {
+                    for (int i = kmerSize; i < array.length; i++) {
                         kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
-                        InsertToFrame(kmer, readID, -(i - k + 2), writer);
+                        InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
                     }
                 }
             }
@@ -149,8 +149,6 @@
 
             @Override
             public void open(IFrameWriter writer) throws HyracksDataException {
-                // TODO Auto-generated method stub
-
             }
 
             @Override
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
index 0f791a1..eab197e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -18,6 +18,9 @@
 import java.io.DataOutput;

 import java.io.IOException;

 

+import org.apache.commons.logging.Log;

+import org.apache.commons.logging.LogFactory;

+

 import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;

 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

@@ -31,11 +34,12 @@
 

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

     private static final long serialVersionUID = 1L;

-

+    private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);

     @Override

     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

             throws HyracksDataException {

+        final int frameSize = ctx.getFrameSize();

         return new IAggregatorDescriptor() {

 

             private PositionReference position = new PositionReference();

@@ -90,6 +94,9 @@
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();

                 ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;

                 try {

+                    if (inputVal.getLength() > frameSize/2){

+                        LOG.warn("MergeKmer: output data size is too big: " + inputVal.getLength());

+                    }

                     fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());

                     tupleBuilder.addFieldEndOffset();

 

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index 0d51035..974dead 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -4,6 +4,9 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -22,6 +25,7 @@
     private static final long serialVersionUID = 1L;
 
     private final int ValidPosCount;
+    private static final Log LOG = LogFactory.getLog(MergeReadIDAggregateFactory.class);
 
     public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
         ValidPosCount = getPositionCount(readLength, kmerLength);
@@ -45,6 +49,7 @@
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
+        final int frameSize = ctx.getFrameSize();
         return new IAggregatorDescriptor() {
 
             class PositionArray {
@@ -168,6 +173,7 @@
                 }
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
                 try {
+                    int totalSize = 0;
                     for (int i = 0; i < ValidPosCount; i++) {
                         fieldOutput.write(positionArray.forwardStorages[i].getByteArray(),
                                 positionArray.forwardStorages[i].getStartOffset(),
@@ -178,6 +184,22 @@
                                 positionArray.reverseStorages[i].getStartOffset(),
                                 positionArray.reverseStorages[i].getLength());
                         tupleBuilder.addFieldEndOffset();
+
+                        totalSize += positionArray.forwardStorages[i].getLength()
+                                + positionArray.reverseStorages[i].getLength();
+                    }
+                    if (totalSize > frameSize / 2) {
+                        int leadbyte = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+                        int readID = accessor.getBuffer().getInt(
+                                leadbyte + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+                        LOG.warn("MergeReadID on read:" + readID + " is of size: " + totalSize + ", current frameSize:"
+                                + frameSize + "\n Recommendate to enlarge the FrameSize");
+                    }
+                    if (totalSize > frameSize){
+                        for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+                            System.out.println(ste);
+                        }
+                        throw new HyracksDataException("Data is too long");
                     }
                 } catch (IOException e) {
                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index f614846..7ea065d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -33,15 +33,15 @@
 	 * 
 	 */
     private static final long serialVersionUID = 1L;
-    private KmerBytesWritable kmer;
-    private PositionListWritable plist;
-
+    
+    private final int kmerSize;
     public KMerTextWriterFactory(int k) {
-        kmer = new KmerBytesWritable(k);
-        plist = new PositionListWritable();
+        kmerSize =k;
     }
 
     public class TupleWriter implements ITupleWriter {
+        private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+        private PositionListWritable plist = new PositionListWritable();
         @Override
         public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
             try {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
index 2d420c3..1f12bb5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJobConf.java
@@ -50,7 +50,7 @@
 
     public static final int DEFAULT_KMERLEN = 21;
     public static final int DEFAULT_READLEN = 124;
-    public static final int DEFAULT_FRAME_SIZE = 32768;
+    public static final int DEFAULT_FRAME_SIZE = 128*1024;
     public static final int DEFAULT_FRAME_LIMIT = 4096;
     public static final int DEFAULT_TABLE_SIZE = 10485767;
     public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index c6a69d4..28e4ff5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -56,13 +56,15 @@
              * 
              */
             private static final long serialVersionUID = 1L;
-            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
-            private PositionWritable pos = new PositionWritable();
+            
 
             @Override
             public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
                 return new ITupleWriter(){
 
+                    private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+                    private PositionWritable pos = new PositionWritable();
+                    
                     @Override
                     public void open(DataOutput output) throws HyracksDataException {
                         // TODO Auto-generated method stub
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
index c335475..5f89bcc 100644
--- a/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
@@ -12,11 +12,11 @@
 #Clean up temp dir
 
 #rm -rf $NCTMP_DIR2
-mkdir $NCTMP_DIR2
+mkdir -p $NCTMP_DIR2
 
 #Clean up log dir
 #rm -rf $NCLOGS_DIR2
-mkdir $NCLOGS_DIR2
+mkdir -p $NCLOGS_DIR2
 
 
 #Clean up I/O working dir
@@ -35,16 +35,14 @@
 
 #Get node ID
 NODEID=`hostname | cut -d '.' -f 1`
-NODEID=${NODEID}2
 
 #Set JAVA_OPTS
 export JAVA_OPTS=$NCJAVA_OPTS2
 
-cd $HYRACKS_HOME
-HYRACKS_HOME=`pwd`
+GENOMIX_HOME=`pwd`
 
 #Enter the temp dir
 cd $NCTMP_DIR2
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
+${GENOMIX_HOME}/bin/genomixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 07cfab4..ec95aa6 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -54,7 +54,7 @@
 
     private JobConf conf = new JobConf();
     private int numberOfNC = 2;
-    private int numPartitionPerMachine = 1;
+    private int numPartitionPerMachine = 2;
 
     private Driver driver;
 
diff --git a/genomix/genomix-pregelix/data/input/.out.crc b/genomix/genomix-pregelix/data/input/.out.crc
new file mode 100644
index 0000000..422fddb
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/.out.crc
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/test/result.graphbuild.txt.bin b/genomix/genomix-pregelix/data/input/test/result.graphbuild.txt.bin
new file mode 100644
index 0000000..4865aeb
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/test/result.graphbuild.txt.bin
Binary files differ
diff --git a/genomix/genomix-pregelix/data/test/result.graphbuild.bidirectional.txt b/genomix/genomix-pregelix/data/test/result.graphbuild.bidirectional.txt
new file mode 100644
index 0000000..a41ce56
--- /dev/null
+++ b/genomix/genomix-pregelix/data/test/result.graphbuild.bidirectional.txt
@@ -0,0 +1,18 @@
+((2,1)	[(2,3)]	[]	[]	[]	AATAGA)	(null)
+((2,3)	[(6,1),(2,4)]	[]	[]	[(2,1)]	TAGAA)	(null)
+((2,4)	[(6,2)]	[]	[]	[(2,3)]	AGAAG)	(null)
+((4,1)	[(4,3)]	[]	[]	[]	AATAGA)	(null)
+((4,3)	[(6,1),(4,4)]	[]	[]	[(4,1)]	TAGAA)	(null)
+((4,4)	[(6,2)]	[]	[]	[(4,3)]	AGAAG)	(null)
+((6,1)	[(6,2)]	[]	[]	[(2,3),(1,3),(3,3),(4,3),(5,3)]	AGAAG)	(null)
+((6,2)	[(6,3)]	[]	[]	[(2,4),(3,4),(1,4),(4,4),(5,4),(6,1)]	GAAGA)	(null)
+((6,3)	[]	[]	[]	[(6,2)]	AAGAAG)	(null)
+((1,1)	[(1,3)]	[]	[]	[]	AATAGA)	(null)
+((1,3)	[(6,1),(1,4)]	[]	[]	[(1,1)]	TAGAA)	(null)
+((1,4)	[(6,2)]	[]	[]	[(1,3)]	AGAAG)	(null)
+((3,1)	[(3,3)]	[]	[]	[]	AATAGA)	(null)
+((3,3)	[(6,1),(3,4)]	[]	[]	[(3,1)]	TAGAA)	(null)
+((3,4)	[(6,2)]	[]	[]	[(3,3)]	AGAAG)	(null)
+((5,1)	[(5,3)]	[]	[]	[]	AATAGA)	(null)
+((5,3)	[(6,1),(5,4)]	[]	[]	[(5,1)]	TAGAA)	(null)
+((5,4)	[(6,2)]	[]	[]	[(5,3)]	AGAAG)	(null)
diff --git a/genomix/genomix-pregelix/data/test/result.graphbuild.txt.bin b/genomix/genomix-pregelix/data/test/result.graphbuild.txt.bin
new file mode 100644
index 0000000..4865aeb
--- /dev/null
+++ b/genomix/genomix-pregelix/data/test/result.graphbuild.txt.bin
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 3fe04ff..0b0055b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -66,8 +66,10 @@
                 /**
                  * set the vertex value
                  */
-                vertexValue.setIncomingList(node.getIncomingList());
-                vertexValue.setOutgoingList(node.getOutgoingList());
+                vertexValue.setFFList(node.getFFList());
+                vertexValue.setFRList(node.getFRList());
+                vertexValue.setRFList(node.getRFList());
+                vertexValue.setRRList(node.getRRList());
                 vertexValue.setMergeChain(node.getKmer());
                 vertexValue.setState(State.NON_VERTEX);
                 vertex.setVertexValue(vertexValue);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index 4011838..91634d8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -69,8 +69,10 @@
             /**
              * set the vertex value
              */
-            vertexValue.setIncomingList(node.getIncomingList());
-            vertexValue.setOutgoingList(node.getOutgoingList());
+            vertexValue.setFFList(node.getFFList());
+            vertexValue.setFRList(node.getFRList());
+            vertexValue.setRFList(node.getRFList());
+            vertexValue.setRRList(node.getRRList());
             vertexValue.setMergeChain(node.getKmer());
             vertexValue.setState(State.NON_VERTEX);
             vertex.setVertexValue(vertexValue);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
new file mode 100644
index 0000000..fae1970
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class AdjacencyListWritable implements WritableComparable<AdjacencyListWritable>{
+    private PositionListWritable forwardList;
+    private PositionListWritable reverseList;
+    
+    public AdjacencyListWritable(){
+        forwardList = new PositionListWritable();
+        reverseList = new PositionListWritable();
+    }
+
+    public void set(AdjacencyListWritable adjacencyList){
+        forwardList.set(adjacencyList.getForwardList());
+        reverseList.set(adjacencyList.getReverseList());
+    }
+    
+    public void reset(){
+        forwardList.reset();
+        reverseList.reset();
+    }
+    
+    public PositionListWritable getForwardList() {
+        return forwardList;
+    }
+
+    public void setForwardList(PositionListWritable forwardList) {
+        this.forwardList = forwardList;
+    }
+
+    public PositionListWritable getReverseList() {
+        return reverseList;
+    }
+
+    public void setReverseList(PositionListWritable reverseList) {
+        this.reverseList = reverseList;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        forwardList.readFields(in);
+        reverseList.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        forwardList.write(out);
+        reverseList.write(out);
+    }
+
+    @Override
+    public int compareTo(AdjacencyListWritable o) {
+        return 0;
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
new file mode 100644
index 0000000..8dec857
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/IncomingListWritable.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class IncomingListWritable implements WritableComparable<IncomingListWritable>{
+    private PositionListWritable reverseForwardList;
+    private PositionListWritable reverseReverseList;
+    
+    public IncomingListWritable(){
+        reverseForwardList = new PositionListWritable();
+        reverseReverseList = new PositionListWritable();
+    }
+
+    public PositionListWritable getReverseForwardList() {
+        return reverseForwardList;
+    }
+
+    public void setReverseForwardList(PositionListWritable reverseForwardList) {
+        this.reverseForwardList = reverseForwardList;
+    }
+
+    public PositionListWritable getReverseReverseList() {
+        return reverseReverseList;
+    }
+
+    public void setReverseReverseList(PositionListWritable reverseReverseList) {
+        this.reverseReverseList = reverseReverseList;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        reverseForwardList.readFields(in);
+        reverseReverseList.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        reverseForwardList.write(out);
+        reverseReverseList.write(out);
+    }
+
+    @Override
+    public int compareTo(IncomingListWritable o) {
+        return 0;
+    }
+    
+    public int inDegree(){
+        return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 31d313d..f0a6b58 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -9,7 +9,6 @@
 import edu.uci.ics.genomix.pregelix.type.CheckMessage;
 import edu.uci.ics.genomix.pregelix.type.Message;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
 public class MessageWritable implements WritableComparable<MessageWritable> {
@@ -20,7 +19,7 @@
      */
     private PositionWritable sourceVertexId;
     private KmerBytesWritable chainVertexId;
-    private PositionListWritable neighberNode; //incoming or outgoing
+    private AdjacencyListWritable neighberNode; //incoming or outgoing
     private byte message;
 
     private byte checkMessage;
@@ -28,12 +27,12 @@
     public MessageWritable() {
         sourceVertexId = new PositionWritable();
         chainVertexId = new KmerBytesWritable(0);
-        neighberNode = new PositionListWritable();
+        neighberNode = new AdjacencyListWritable();
         message = Message.NON;
         checkMessage = (byte) 0;
     }
 
-    public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, PositionListWritable neighberNode, byte message) {
+    public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
         checkMessage = 0;
         if (sourceVertexId != null) {
             checkMessage |= CheckMessage.SOURCE;
@@ -79,11 +78,11 @@
         }
     }
     
-    public PositionListWritable getNeighberNode() {
+    public AdjacencyListWritable getNeighberNode() {
         return neighberNode;
     }
 
-    public void setNeighberNode(PositionListWritable neighberNode) {
+    public void setNeighberNode(AdjacencyListWritable neighberNode) {
         if(neighberNode != null){
             checkMessage |= CheckMessage.NEIGHBER;
             this.neighberNode.set(neighberNode);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
new file mode 100644
index 0000000..275954d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/OutgoingListWritable.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class OutgoingListWritable implements WritableComparable<OutgoingListWritable>{
+    private PositionListWritable forwardForwardList;
+    private PositionListWritable forwardReverseList;
+    
+    public OutgoingListWritable(){
+        forwardForwardList = new PositionListWritable();
+        forwardReverseList = new PositionListWritable();
+    }
+
+    public PositionListWritable getForwardForwardList() {
+        return forwardForwardList;
+    }
+
+    public void setForwardForwardList(PositionListWritable forwardForwardList) {
+        this.forwardForwardList = forwardForwardList;
+    }
+
+    public PositionListWritable getForwardReverseList() {
+        return forwardReverseList;
+    }
+
+    public void setForwardReverseList(PositionListWritable forwardReverseList) {
+        this.forwardReverseList = forwardReverseList;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        forwardForwardList.readFields(in);
+        forwardReverseList.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        forwardForwardList.write(out);
+        forwardReverseList.write(out);   
+    }
+
+    @Override
+    public int compareTo(OutgoingListWritable o) {
+        return 0;
+    }
+    
+    public int outDegree(){
+        return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 2554c8e..985c2ac 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -10,44 +10,82 @@
 
 public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
 
-    private PositionListWritable incomingList;
-    private PositionListWritable outgoingList;
+    private AdjacencyListWritable incomingList;
+    private AdjacencyListWritable outgoingList;
     private byte state;
     private KmerBytesWritable mergeChain;
 
     public ValueStateWritable() {
-        incomingList = new PositionListWritable();
-        outgoingList = new PositionListWritable();
+        incomingList = new AdjacencyListWritable();
+        outgoingList = new AdjacencyListWritable();
         state = State.NON_VERTEX;
         mergeChain = new KmerBytesWritable(0);
     }
 
-    public ValueStateWritable(PositionListWritable incomingList, PositionListWritable outgoingList, 
+    public ValueStateWritable(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
+            PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
             byte state, KmerBytesWritable mergeChain) {
-        set(incomingList, outgoingList, state, mergeChain);
+        set(forwardForwardList, forwardReverseList, 
+                reverseForwardList, reverseReverseList,
+                state, mergeChain);
     }
 
-    public void set(PositionListWritable incomingList, PositionListWritable outgoingList, 
+    public void set(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
+            PositionListWritable reverseForwardList, PositionListWritable reverseReverseList, 
             byte state, KmerBytesWritable mergeChain) {
-        this.incomingList.set(incomingList);
-        this.outgoingList.set(outgoingList);
+        this.incomingList.setForwardList(reverseForwardList);
+        this.incomingList.setReverseList(reverseReverseList);
+        this.outgoingList.setForwardList(forwardForwardList);
+        this.outgoingList.setReverseList(forwardReverseList);
         this.state = state;
         this.mergeChain.set(mergeChain);
     }
     
-    public PositionListWritable getIncomingList() {
+    public PositionListWritable getFFList() {
+        return outgoingList.getForwardList();
+    }
+
+    public PositionListWritable getFRList() {
+        return outgoingList.getReverseList();
+    }
+
+    public PositionListWritable getRFList() {
+        return incomingList.getForwardList();
+    }
+
+    public PositionListWritable getRRList() {
+        return incomingList.getReverseList();
+    }
+    
+    public void setFFList(PositionListWritable forwardForwardList){
+        outgoingList.setForwardList(forwardForwardList);
+    }
+    
+    public void setFRList(PositionListWritable forwardReverseList){
+        outgoingList.setReverseList(forwardReverseList);
+    }
+    
+    public void setRFList(PositionListWritable reverseForwardList){
+        incomingList.setForwardList(reverseForwardList);
+    }
+
+    public void setRRList(PositionListWritable reverseReverseList){
+        incomingList.setReverseList(reverseReverseList);
+    }
+    
+    public AdjacencyListWritable getIncomingList() {
         return incomingList;
     }
 
-    public void setIncomingList(PositionListWritable incomingList) {
+    public void setIncomingList(AdjacencyListWritable incomingList) {
         this.incomingList = incomingList;
     }
 
-    public PositionListWritable getOutgoingList() {
+    public AdjacencyListWritable getOutgoingList() {
         return outgoingList;
     }
 
-    public void setOutgoingList(PositionListWritable outgoingList) {
+    public void setOutgoingList(AdjacencyListWritable outgoingList) {
         this.outgoingList = outgoingList;
     }
 
@@ -94,14 +132,21 @@
 
     @Override
     public String toString() {
-        return state + "\t" + getLengthOfMergeChain() + "\t" + mergeChain.toString();
+        StringBuilder sbuilder = new StringBuilder();
+        sbuilder.append('(');
+        sbuilder.append(outgoingList.getForwardList().toString()).append('\t');
+        sbuilder.append(outgoingList.getReverseList().toString()).append('\t');
+        sbuilder.append(incomingList.getForwardList().toString()).append('\t');
+        sbuilder.append(incomingList.getReverseList().toString()).append('\t');
+        sbuilder.append(mergeChain.toString()).append(')');
+        return sbuilder.toString();
     }
     
-    public int inDegree() {
-        return incomingList.getCountOfPosition();
+    public int inDegree(){
+        return incomingList.getForwardList().getCountOfPosition() + incomingList.getReverseList().getCountOfPosition();
     }
-
-    public int outDegree() {
-        return outgoingList.getCountOfPosition();
+    
+    public int outDegree(){
+        return outgoingList.getForwardList().getCountOfPosition() + outgoingList.getReverseList().getCountOfPosition();
     }
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
index 6be0eee..c3bb663 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -75,12 +75,18 @@
      * get destination vertex
      */
     public PositionWritable getNextDestVertexId(ValueStateWritable value) {
-        posIterator = value.getOutgoingList().iterator();
+        if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+            posIterator = value.getFFList().iterator();
+        else // #FRList() > 0
+            posIterator = value.getFRList().iterator();
         return posIterator.next();
     }
 
     public PositionWritable getPreDestVertexId(ValueStateWritable value) {
-        posIterator = value.getIncomingList().iterator();
+        if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+            posIterator = value.getRFList().iterator();
+        else // #RRList() > 0
+            posIterator = value.getRRList().iterator();
         return posIterator.next();
     }
 
@@ -88,7 +94,12 @@
      * head send message to all next nodes
      */
     public void sendMsgToAllNextNodes(ValueStateWritable value) {
-        posIterator = value.getOutgoingList().iterator();
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
         while(posIterator.hasNext()){
             destVertexId.set(posIterator.next());
             sendMsg(destVertexId, outgoingMsg);
@@ -99,7 +110,12 @@
      * head send message to all previous nodes
      */
     public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
-        posIterator = value.getIncomingList().iterator();
+        posIterator = value.getRFList().iterator(); // RFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getRRList().iterator(); // RRList
         while(posIterator.hasNext()){
             destVertexId.set(posIterator.next());
             sendMsg(destVertexId, outgoingMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index af38072..4fcc09e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -80,12 +80,18 @@
      * get destination vertex
      */
     public PositionWritable getNextDestVertexId(ValueStateWritable value) {
-        posIterator = value.getOutgoingList().iterator();
+        if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+            posIterator = value.getFFList().iterator();
+        else // #FRList() > 0
+            posIterator = value.getFRList().iterator();
         return posIterator.next();
     }
 
     public PositionWritable getPreDestVertexId(ValueStateWritable value) {
-        posIterator = value.getIncomingList().iterator();
+        if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+            posIterator = value.getRFList().iterator();
+        else // #RRList() > 0
+            posIterator = value.getRRList().iterator();
         return posIterator.next();
     }
 
@@ -93,7 +99,12 @@
      * head send message to all next nodes
      */
     public void sendMsgToAllNextNodes(ValueStateWritable value) {
-        posIterator = value.getOutgoingList().iterator();
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
         while(posIterator.hasNext()){
             destVertexId.set(posIterator.next());
             sendMsg(destVertexId, outgoingMsg);
@@ -104,7 +115,12 @@
      * head send message to all previous nodes
      */
     public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
-        posIterator = value.getIncomingList().iterator();
+        posIterator = value.getRFList().iterator(); // RFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getRRList().iterator(); // RRList
         while(posIterator.hasNext()){
             destVertexId.set(posIterator.next());
             sendMsg(destVertexId, outgoingMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
index 6f4354a..8a03aa7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
@@ -87,12 +87,18 @@
      * get destination vertex
      */
     public PositionWritable getNextDestVertexId(ValueStateWritable value) {
-        posIterator = value.getOutgoingList().iterator();
+        if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+            posIterator = value.getFFList().iterator();
+        else // #FRList() > 0
+            posIterator = value.getFRList().iterator();
         return posIterator.next();
     }
 
     public PositionWritable getPreDestVertexId(ValueStateWritable value) {
-        posIterator = value.getIncomingList().iterator();
+        if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+            posIterator = value.getRFList().iterator();
+        else // #RRList() > 0
+            posIterator = value.getRRList().iterator();
         return posIterator.next();
     }
 
@@ -100,7 +106,12 @@
      * head send message to all next nodes
      */
     public void sendMsgToAllNextNodes(ValueStateWritable value) {
-        posIterator = value.getOutgoingList().iterator();
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
         while(posIterator.hasNext()){
             destVertexId.set(posIterator.next());
             sendMsg(destVertexId, outgoingMsg);
@@ -111,7 +122,12 @@
      * head send message to all previous nodes
      */
     public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
-        posIterator = value.getIncomingList().iterator();
+        posIterator = value.getRFList().iterator(); // RFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getRRList().iterator(); // RRList
         while(posIterator.hasNext()){
             destVertexId.set(posIterator.next());
             sendMsg(destVertexId, outgoingMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
new file mode 100644
index 0000000..b81491b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -0,0 +1,58 @@
+package edu.uci.ics.genomix.pregelix.sequencefile;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class ConvertNodeToIdValue {
+
+    public static void convert(Path inFile, Path outFile)
+            throws IOException {
+        Configuration conf = new Configuration();
+        FileSystem fileSys = FileSystem.get(conf);
+
+        SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+        SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, PositionWritable.class,
+                ValueStateWritable.class, CompressionType.NONE);
+        NodeWritable node = new NodeWritable();
+        NullWritable value = NullWritable.get();
+        PositionWritable outputKey = new PositionWritable();
+        ValueStateWritable outputValue = new ValueStateWritable();
+
+        while(reader.next(node, value)) {
+            System.out.println(node.getNodeID().toString());
+            outputKey.set(node.getNodeID());
+            outputValue.setFFList(node.getFFList());
+            outputValue.setFRList(node.getFRList());
+            outputValue.setRFList(node.getRFList());
+            outputValue.setRRList(node.getRRList());
+            outputValue.setMergeChain(node.getKmer());
+            outputValue.setState(State.NON_VERTEX);
+            writer.append(outputKey, outputValue);
+        }
+        writer.close();
+        reader.close();
+    }
+    
+    public static void main(String[] args) throws IOException {
+        Path dir = new Path("data/test");
+        Path outDir = new Path("data/input");
+        FileUtils.cleanDirectory(new File("data/input"));
+        Path inFile = new Path(dir, "result.graphbuild.txt.bin");
+        Path outFile = new Path(outDir, "out");
+        convert(inFile,outFile);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index f2a61be..1740744 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -10,9 +10,7 @@
      * @param vertexValue
      */
     public static boolean isPathVertex(ValueStateWritable value) {
-        if (value.inDegree() == 1 && value.outDegree() == 1)
-            return true;
-        return false;
+        return value.inDegree() == 1 && value.outDegree() == 1;
     }
 
     /**
@@ -21,9 +19,7 @@
      * @param vertexValue
      */
     public static boolean isHeadVertex(ValueStateWritable value) {
-        if (value.outDegree() > 0 && !isPathVertex(value))
-            return true;
-        return false;
+        return value.outDegree() > 0 && !isPathVertex(value);
     }
 
     /**
@@ -32,9 +28,7 @@
      * @param vertexValue
      */
     public static boolean isRearVertex(ValueStateWritable value) {
-        if (value.inDegree() > 0 && !isPathVertex(value))
-            return true;
-        return false;
+        return value.inDegree() > 0 && !isPathVertex(value);
     }
 
     /**
@@ -42,9 +36,7 @@
      */
     public static boolean isCycle(KmerBytesWritable kmer, KmerBytesWritable mergeChain, int kmerSize) {
         String chain = mergeChain.toString().substring(1);
-        if (chain.contains(kmer.toString()))
-            return true;
-        return false;
+        return chain.contains(kmer.toString());
 
         /*subKmer.set(vertexId);
         for(int istart = 1; istart < mergeChain.getKmerLength() - kmerSize + 1; istart++){
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index f1fcdac..cdc97c1 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -73,8 +73,8 @@
 
     public static void main(String[] args) throws IOException {
         genNaiveAlgorithmForMergeGraph();
-        genLogAlgorithmForMergeGraph();
-        genP3ForMergeGraph();
+        //genLogAlgorithmForMergeGraph();
+        //genP3ForMergeGraph();
     }
 
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index c8aaa84..a8f5a81 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -43,9 +43,9 @@
 public class PathMergeSmallTestSuite extends TestSuite {
     private static final Logger LOGGER = Logger.getLogger(PathMergeSmallTestSuite.class.getName());
 
-    public static final String PreFix = "data/PathTestSet"; //"graphbuildresult";
+    public static final String PreFix = "data/input"; //"graphbuildresult";
     public static final String[] TestDir = { PreFix + File.separator
-    + "LongPath"};/*, PreFix + File.separator
+    + "test"};/*, PreFix + File.separator
             /*+ "CyclePath"};, PreFix + File.separator
             + "SimplePath", PreFix + File.separator
             + "SinglePath", PreFix + File.separator