Merge branch 'genomix/fullstack_bidirection' into jianfeng/genomix-reverse
diff --git a/genomix/genomix-hadoop/pom.xml b/genomix/genomix-hadoop/pom.xml
index fe83c3c..610092a 100755
--- a/genomix/genomix-hadoop/pom.xml
+++ b/genomix/genomix-hadoop/pom.xml
@@ -156,5 +156,23 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>genomix-hyracks</artifactId>
+			<version>0.2.6-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.6-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.6-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
new file mode 100644
index 0000000..b405161
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
@@ -0,0 +1,215 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+@SuppressWarnings("deprecation")
+public class TestPathMergeH3 {
+    private static final int KMER_LENGTH = 5;
+    private static final int READ_LENGTH = 8;
+    
+    private static final String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
+    private static final String HDFS_SEQUENCE = "/00-sequence/";
+    private static final String HDFS_GRAPHBUILD = "/01-graphbuild/";
+    private static final String HDFS_MERGED = "/02-graphmerge/";
+
+    private static final String EXPECTED_ROOT = "src/test/resources/expected/";
+    private static final String ACTUAL_ROOT = "src/test/resources/actual/";
+    private static final String GRAPHBUILD_FILE = "result.graphbuild.txt";
+    private static final String PATHMERGE_FILE = "result.mergepath.txt";
+    
+    private static final String HADOOP_CONF_ROOT = "src/test/resources/hadoop/conf/";
+    
+    private MiniDFSCluster dfsCluster;
+
+    private static JobConf conf = new JobConf();
+    private int numberOfNC = 2;
+    private int numPartitionPerMachine = 1;
+
+    private Driver driver;
+
+    @Test
+    public void TestBuildGraph() throws Exception {
+        copySequenceToDFS();
+        buildGraph();
+    }
+    
+//    @Test
+    public void TestMergeOneIteration() throws Exception {
+        copySequenceToDFS();
+        buildGraph();
+        MergePathsH3Driver h3 = new MergePathsH3Driver();
+        h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, null, conf);
+        copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
+    }
+
+
+
+    public void buildGraph() throws Exception {
+        FileInputFormat.setInputPaths(conf, HDFS_SEQUENCE);
+        FileOutputFormat.setOutputPath(conf, new Path(HDFS_GRAPHBUILD));
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        cleanupStores();
+        HyracksUtils.init();
+        FileUtils.forceMkdir(new File(ACTUAL_ROOT));
+        FileUtils.cleanDirectory(new File(ACTUAL_ROOT));
+        startHDFS();
+
+        conf.setInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH);
+        conf.setInt(GenomixJobConf.READ_LENGTH, READ_LENGTH);
+        driver = new Driver(HyracksUtils.CC_HOST,
+                HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+    }
+    
+    /*
+     * Merge and copy a DFS directory to a local destination, converting to text if necessary. Also locally store the binary-formatted result if available.
+     */
+    private static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, Configuration conf) throws IOException {
+        String fileFormat = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+        if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(fileFormat)) {
+            // for text files, just concatenate them together
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+                    FileSystem.getLocal(new Configuration()), new Path(localDestFile),
+                    false, conf, null);
+        } else {
+            // file is binary
+            // merge and store the binary format
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir),
+                    FileSystem.getLocal(new Configuration()), new Path(localDestFile + ".bin"),
+                    false, conf, null);
+            // load the Node's and write them out as text locally
+            FileSystem.getLocal(new Configuration()).mkdirs(new Path(localDestFile).getParent());
+            File filePathTo = new File(localDestFile);
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+            for (int i=0; i < java.lang.Integer.MAX_VALUE; i++) {
+                Path path = new Path(hdfsSrcDir + "part-" + i);
+                FileSystem dfs = FileSystem.get(conf);
+                if (!dfs.exists(path)) {
+                    break;
+                }
+                if (dfs.getFileStatus(path).getLen() == 0) {
+                    continue;
+                }
+                SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
+                NodeWritable key = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KMER_LENGTH));
+                NullWritable value = NullWritable.get();
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                }
+                reader.close();
+            }
+            bw.close();
+        }
+
+    }
+    
+    private boolean checkResults(String expectedPath, String actualPath, int[] poslistField) throws Exception {
+        File dumped = new File(actualPath); 
+        if (poslistField != null) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
+
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    private void startHDFS() throws IOException {
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "core-site.xml"));
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "mapred-site.xml"));
+        conf.addResource(new Path(HADOOP_CONF_ROOT + "hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_ROOT + "conf.xml")));
+        conf.writeXml(confOutput);
+        confOutput.close();
+    }
+    
+    private void copySequenceToDFS() throws IOException {
+        FileSystem dfs = FileSystem.get(conf);
+        Path src = new Path(LOCAL_SEQUENCE_FILE);
+        Path dest = new Path(HDFS_SEQUENCE);
+        dfs.mkdirs(dest);
+        // dfs.mkdirs(result);
+        dfs.copyFromLocalFile(src, dest);
+    }
+    
+    @BeforeClass
+    public static void cleanUpEntry() throws IOException {
+        // local cleanup
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(ACTUAL_ROOT))) {
+            lfs.delete(new Path(ACTUAL_ROOT), true);
+        }
+        // dfs cleanup
+        FileSystem dfs = FileSystem.get(conf);
+        String[] paths = {HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED};
+        for (String path : paths) {
+            if (dfs.exists(new Path(path))) {
+                dfs.delete(new Path(path), true);
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        HyracksUtils.deinit();
+        cleanupHDFS();
+    }
+
+    private void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+    }
+}
diff --git a/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
new file mode 100755
index 0000000..13190dd
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/data/webmap/text.txt
@@ -0,0 +1,6 @@
+1	AATAGAAG

+2	AATAGAAG

+3	AATAGAAG

+4	AATAGAAG

+5	AATAGAAG

+6	AGAAGAAG

diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..3e5bacb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+	<property>
+		<name>fs.default.name</name>
+		<value>hdfs://127.0.0.1:31888</value>
+	</property>
+	<property>
+		<name>hadoop.tmp.dir</name>
+		<value>/tmp/hadoop</value>
+	</property>
+
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..b1b1902
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+	<property>
+		<name>dfs.replication</name>
+		<value>1</value>
+	</property>
+
+	<property>
+		<name>dfs.block.size</name>
+		<value>65536</value>
+	</property>
+
+</configuration>
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..525e7d5
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+	<property>
+		<name>mapred.job.tracker</name>
+		<value>localhost:29007</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.map.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.reduce.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.max.split.size</name>
+		<value>2048</value>
+	</property>
+
+</configuration>
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index ebc49f2..8377df6 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -35,7 +35,7 @@
 						<configuration>
 							<programs>
 								<program>
-									<mainClass>edu.uci.ics.genomix.driver.Driver</mainClass>
+									<mainClass>edu.uci.ics.genomix.hyracks.driver.Driver</mainClass>
 									<name>genomix</name>
 								</program>
                                 <program>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index f625143..8609519 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -137,8 +137,10 @@
         private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
                 ArrayTupleBuilder builder2) {
             try {
+                builder2.reset();
                 builder2.addField(pos.getByteArray(), pos.getStartOffset(), PositionReference.INTBYTES);
                 builder2.addField(pos.getByteArray(), pos.getStartOffset() + PositionReference.INTBYTES, 1);
+                
                 if (posList2 == null) {
                     builder2.addFieldEndOffset();
                 } else {
@@ -163,7 +165,6 @@
                         throw new IllegalStateException();
                     }
                 }
-                builder2.reset();
             } catch (HyracksDataException e) {
                 throw new IllegalStateException(
                         "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index a775d7e..98d21af 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -247,6 +247,7 @@
                 return;
             }
             try {
+                builder.reset();
                 builder.addField(node.getNodeID().getByteArray(), node.getNodeID().getStartOffset(), node.getNodeID()
                         .getLength());
                 builder.getDataOutput().writeInt(node.getCount());
@@ -268,7 +269,6 @@
                         throw new IllegalStateException("Failed to append tuplebuilder to frame");
                     }
                 }
-                builder.reset();
             } catch (IOException e) {
                 throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
             }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 9fb2d04..00409ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -4,6 +4,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Writer;
@@ -59,7 +60,7 @@
         @Override
         public void open(DataOutput output) throws HyracksDataException {
             try {
-                writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, NodeWritable.class, null,
+                writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, NodeWritable.class, NullWritable.class,
                         CompressionType.NONE, null);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
@@ -85,7 +86,7 @@
                     tuple.getFieldData(InputKmerBytesField), tuple.getFieldStart(InputKmerBytesField));
 
             try {
-                writer.append(node, null);
+                writer.append(node, NullWritable.get());
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -93,8 +94,6 @@
 
         @Override
         public void close(DataOutput output) throws HyracksDataException {
-            // TODO Auto-generated method stub
-
         }
 
     }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 968123e..2872a2d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -76,10 +76,11 @@
                                             tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
                                             tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
 
+
                                     String kmerString = "";
                                     if (posInRead > 0) {
                                         if (kmer.getLength() > tuple
-                                                .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+                                                .getFieldLength(MapKmerPositionToReadOperator.OutputKmerField)) {
                                             throw new IllegalArgumentException("Not enough kmer bytes");
                                         }
                                         kmer.setNewReference(
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 68258d7..07cfab4 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -15,6 +15,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -26,7 +27,7 @@
 import edu.uci.ics.genomix.hyracks.driver.Driver;
 import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
 
 @SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
@@ -59,10 +60,10 @@
 
     @Test
     public void TestAll() throws Exception {
-//        TestReader();
-//        TestGroupbyKmer();
-//        TestMapKmerToRead();
-//        TestGroupByReadID();
+        TestReader();
+        TestGroupbyKmer();
+        TestMapKmerToRead();
+        TestGroupByReadID();
         TestEndToEnd();
     }
 
@@ -97,7 +98,8 @@
     }
 
     public void TestEndToEnd() throws Exception {
-        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        //conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
         cleanUpReEntry();
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
@@ -187,18 +189,14 @@
                 }
                 SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
 
-                // KmerBytesWritable key = (KmerBytesWritable)
-                // ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
-                // KmerCountValue value = (KmerCountValue)
-                // ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                KmerBytesWritable value = null;
-                while (reader.next(key, value)) {
-                    if (key == null || value == null) {
+                NodeWritable node = new NodeWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
+                NullWritable value = NullWritable.get();
+                while (reader.next(node, value)) {
+                    if (node == null) {
                         break;
                     }
-                    bw.write(key.toString() + "\t" + value.toString());
-                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.write(node.toString() );
+                    System.out.println(node.toString());
                     bw.newLine();
                 }
                 reader.close();
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index 6e6a504..d22bd0c 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -164,6 +164,9 @@
     }
     
     private static boolean containStrings(String lineExpected, String actualLine, int[] poslistField) {
+//        if (lineExpected.equals(actualLine)){
+//            return true;
+//        }
         String[] fieldsExp = lineExpected.split("\\\t");
         String[] fieldsAct = actualLine.split("\\\t");
         if (fieldsAct.length != fieldsExp.length) {