add unmerged option in hyracks graph building
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index b690da2..77907ee 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -38,10 +38,12 @@
 
 public class MapReadToNodeOperator extends AbstractSingleActivityOperatorDescriptor {
 
-    public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+    public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize,
+            boolean bMergeNode) {
         super(spec, 1, 1);
         recordDescriptors[0] = outRecDesc;
         this.kmerSize = kmerSize;
+        this.DoMergeNodeInRead = bMergeNode;
     }
 
     /**
@@ -61,6 +63,8 @@
     public static final int OutputReverseReverseField = 5;
     public static final int OutputKmerBytesField = 6;
 
+    public final boolean DoMergeNodeInRead;
+
     public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[7]);
 
     /**
@@ -141,9 +145,9 @@
             for (int i = InputInfoFieldStart + 2; i < accessor.getFieldCount(); i += 2) {
                 readNodesInfo(tIndex, readID, nextNodeEntry, nextNextNodeEntry, i);
 
-                if (curNodeEntry.inDegree() > 1 || curNodeEntry.outDegree() > 0 || nextNodeEntry.inDegree() > 0
-                        || nextNodeEntry.outDegree() > 0 || nextNextNodeEntry.inDegree() > 0
-                        || nextNextNodeEntry.outDegree() > 0) {
+                if (!DoMergeNodeInRead || curNodeEntry.inDegree() > 1 || curNodeEntry.outDegree() > 0
+                        || nextNodeEntry.inDegree() > 0 || nextNodeEntry.outDegree() > 0
+                        || nextNextNodeEntry.inDegree() > 0 || nextNextNodeEntry.outDegree() > 0) {
                     connect(curNodeEntry, nextNodeEntry);
                     outputNode(curNodeEntry);
                     curNodeEntry.set(nextNodeEntry);
@@ -203,7 +207,7 @@
 
         private void setReverseOutgoingList(NodeReference node, int offset) {
             setCachList(offset);
-            for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+            for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
                 PositionWritable pos = cachePositionList.getPosition(i);
                 if (pos.getPosInRead() > 0) {
                     node.getRFList().append(pos);
@@ -215,7 +219,7 @@
 
         private void setReverseIncomingList(NodeReference node, int offset) {
             setCachList(offset);
-            for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+            for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
                 PositionWritable pos = cachePositionList.getPosition(i);
                 if (pos.getPosInRead() > 0) {
                     if (pos.getPosInRead() > 1) {
@@ -233,7 +237,7 @@
 
         private void setForwardOutgoingList(NodeReference node, int offset) {
             setCachList(offset);
-            for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+            for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
                 PositionWritable pos = cachePositionList.getPosition(i);
                 if (pos.getPosInRead() > 0) {
                     node.getFFList().append(pos);
@@ -245,7 +249,7 @@
 
         private void setForwardIncomingList(NodeReference node, int offset) {
             setCachList(offset);
-            for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+            for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
                 PositionWritable pos = cachePositionList.getPosition(i);
                 if (pos.getPosInRead() > 0) {
                     if (pos.getPosInRead() > 1) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index 35bd838..4c24919 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
 import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
 import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
+import edu.uci.ics.genomix.hyracks.job.JobGenUnMerged;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -47,6 +48,7 @@
         OUTPUT_MAP_KMER_TO_READ,
         OUTPUT_GROUPBY_READID,
         BUILD_DEBRUJIN_GRAPH,
+        BUILD_UNMERGED_GRAPH,
     }
 
     private static final String IS_PROFILING = "genomix.driver.profiling";
@@ -109,6 +111,8 @@
                 case CHECK_KMERREADER:
                     jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
                     break;
+                case BUILD_UNMERGED_GRAPH:
+                    jobGen = new JobGenUnMerged(job, scheduler, ncMap, numPartitionPerMachine);
             }
 
             start = System.currentTimeMillis();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 02087de..09794d0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -258,7 +258,7 @@
         // OutgoingList, Kmer)
 
         AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
-                MapReadToNodeOperator.nodeOutputRec, kmerSize);
+                MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
         connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
         return mapEachReadToNode;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
new file mode 100644
index 0000000..21b6385
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenUnMerged extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenUnMerged(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    @Override
+    public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readCrossAggregator) {
+        AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+                MapReadToNodeOperator.nodeOutputRec, kmerSize, false);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        return mapEachReadToNode;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 0ee2253..3a8746c 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -60,6 +60,7 @@
     private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
     private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
     private static final String EXPECTED_OUPUT_NODE = EXPECTED_DIR + "result_after_generateNode";
+    private static final String EXPECTED_UNMERGED = EXPECTED_DIR + "result_unmerged";
 
     private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
     private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
@@ -79,6 +80,15 @@
         TestMapKmerToRead();
         TestGroupByReadID();
         TestEndToEnd();
+        TestUnMergedNode();
+    }
+
+    public void TestUnMergedNode() throws Exception {
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        cleanUpReEntry();
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_UNMERGED_GRAPH, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_UNMERGED, new int[] { 1, 2, 3, 4 }));
     }
 
     public void TestReader() throws Exception {
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_unmerged b/genomix/genomix-hyracks/src/test/resources/expected/result_unmerged
new file mode 100644
index 0000000..f617779
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_unmerged
@@ -0,0 +1,24 @@
+((1,1)	[(1,2)]	[]	[]	[]	AATAG)
+((1,2)	[(1,3)]	[]	[]	[(1,1)]	ATAGA)
+((1,3)	[(6,1),(1,4)]	[]	[]	[(1,2)]	TAGAA)
+((1,4)	[(6,2)]	[]	[]	[(1,3)]	AGAAG)
+((2,1)	[(2,2)]	[]	[]	[]	AATAG)
+((2,2)	[(2,3)]	[]	[]	[(2,1)]	ATAGC)
+((2,3)	[(2,4)]	[]	[]	[(2,2)]	TAGCT)
+((2,4)	[]	[]	[]	[(2,3)]	AGCTT)
+((3,1)	[(3,2)]	[]	[]	[]	AATAG)
+((3,2)	[(3,3)]	[]	[]	[(3,1)]	ATAGA)
+((3,3)	[(6,1),(3,4)]	[]	[]	[(3,2)]	TAGAA)
+((3,4)	[(6,2)]	[]	[]	[(3,3)]	AGAAG)
+((4,1)	[(4,2)]	[]	[]	[]	AATAG)
+((4,2)	[(4,3)]	[]	[]	[(4,1)]	ATAGC)
+((4,3)	[(4,4)]	[]	[]	[(4,2)]	TAGCT)
+((4,4)	[]	[]	[]	[(4,3)]	AGCTT)
+((5,1)	[(5,2)]	[]	[]	[]	AATAG)
+((5,2)	[(5,3)]	[]	[]	[(5,1)]	ATAGA)
+((5,3)	[(6,1),(5,4)]	[]	[]	[(5,2)]	TAGAA)
+((5,4)	[(6,2)]	[]	[]	[(5,3)]	AGAAG)
+((6,1)	[(6,2)]	[]	[]	[(1,3),(3,3),(5,3)]	AGAAG)
+((6,2)	[(6,3)]	[]	[]	[(3,4),(1,4),(5,4),(6,1)]	GAAGA)
+((6,3)	[(6,4)]	[]	[]	[(6,2)]	AAGAA)
+((6,4)	[]	[]	[]	[(6,3)]	AGAAG)