refactor 'NaiveAlgorithm' to 'P1' and add test case for SplitRepeat
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index 4330c77..9993ab0 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -379,8 +379,12 @@
         s2.add((long) 2);
         s2.add((long) 3);
         Set<Long> intersection = new HashSet<Long>();
-        intersection = s1;
+        intersection.addAll(s1);
         intersection.retainAll(s2);
         System.out.println(intersection.toString());
+        Set<Long> difference = new HashSet<Long>();
+        difference = s1;
+        difference.removeAll(s2);
+        System.out.println(difference.toString());
     }
 }
diff --git a/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt
new file mode 100644
index 0000000..bb03d70
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/SplitRepeat.txt
@@ -0,0 +1,2 @@
+1	AATAG
+2	CATAC
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 217e882..127ab3e 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,13 +22,13 @@
     private JobConf conf = new JobConf();
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private static final String DATA_PATH = "data/webmap/pathmerge_TestSet/9";
+    private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
     private static final String HDFS_PATH = "/webmap";
     private static final String RESULT_PATH = "/result";
     
 //    private static final int COUNT_REDUCER = 2;
     private static final int SIZE_KMER = 3;
-    private static final int READ_LENGTH = 11;
+    private static final int READ_LENGTH = 5;
     
     private MiniDFSCluster dfsCluster;
     private MiniMRCluster mrCluster;
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000
new file mode 100755
index 0000000..4977247
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/SimpleTest/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 43bc73f..0da4a77 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,7 +10,7 @@
 import org.kohsuke.args4j.Option;
 
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
@@ -66,11 +66,11 @@
         for (int i = 1; i < inputs.length; i++)
             FileInputFormat.addInputPaths(job, inputs[0]);
         FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
-        job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+        job.getConfiguration().setInt(P1ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
         job.getConfiguration().setInt(P2ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
         job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
         if (options.numIteration > 0) {
-            job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+            job.getConfiguration().setInt(P1ForPathMergeVertex.ITERATIONS, options.numIteration);
             job.getConfiguration().setInt(P2ForPathMergeVertex.ITERATIONS, options.numIteration);
             job.getConfiguration().setInt(P3ForPathMergeVertex.ITERATIONS, options.numIteration);
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 7fe27e6..66dd474 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -120,6 +120,17 @@
         }
     }
     
+    public KmerBytesWritable getCreatedVertexId() {
+        return kmer;
+    }
+
+    public void setCreatedVertexId(KmerBytesWritable actualKmer) {
+        if (actualKmer != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.kmer.set(actualKmer);
+        }
+    }
+    
     public AdjacencyListWritable getNeighberNode() {
         return neighberNode;
     }
@@ -165,7 +176,10 @@
     }
 
     public void setNodeIdList(PositionListWritable nodeIdList) {
-        this.nodeIdList.set(nodeIdList);
+        if(nodeIdList != null){
+            checkMessage |= CheckMessage.NODEIDLIST;
+            this.nodeIdList.set(nodeIdList);
+        }
     }
 
     @Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
similarity index 95%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 9d27769..31fb897 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -49,10 +49,10 @@
 /**
  * Naive Algorithm for path merge graph
  */
-public class NaiveAlgorithmForPathMergeVertex extends
+public class P1ForPathMergeVertex extends
         Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
-    public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
-    public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+    public static final String KMER_SIZE = "P1ForPathMergeVertex.kmerSize";
+    public static final String ITERATIONS = "P1ForPathMergeVertex.iteration";
     public static int kmerSize = -1;
     private int maxIteration = -1;
 
@@ -251,8 +251,8 @@
     }
 
     public static void main(String[] args) throws Exception {
-        PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
-        job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+        PregelixJob job = new PregelixJob(P1ForPathMergeVertex.class.getSimpleName());
+        job.setVertexClass(P1ForPathMergeVertex.class);
         /**
          * BinaryInput and BinaryOutput
          */
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 640d6bc..f019fb0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -4,14 +4,18 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.KmerListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.BspUtils;
 
 public class SplitRepeatVertex extends 
     BasicGraphCleanVertex{
@@ -86,6 +90,7 @@
             {"FR", "RF"},
             {"FR", "RR"}
     };
+    public static Set<String> existKmerString = new HashSet<String>();
     private Set<Long> readIdSet = new HashSet<Long>();
     private Set<Long> incomingReadIdSet = new HashSet<Long>();
     private Set<Long> outgoingReadIdSet = new HashSet<Long>();
@@ -94,16 +99,60 @@
     private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
     private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
     private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
-    private KmerListWritable incomingEdgeList = new KmerListWritable(kmerSize);
-    private KmerListWritable outgoingEdgeList = new KmerListWritable(kmerSize);
+    private KmerListWritable incomingEdgeList = null; 
+    private KmerListWritable outgoingEdgeList = null; 
     private byte incomingEdgeDir = 0;
     private byte outgoingEdgeDir = 0;
     
+    protected KmerBytesWritable createdVertexId = null;  
     private CreatedVertex createdVertex = new CreatedVertex();
     public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
     
+    /**
+     * initiate kmerSize, maxIteration
+     */
+    public void initVertex() {
+        if (kmerSize == -1)
+            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if (maxIteration < 0)
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+        if(incomingMsg == null)
+            incomingMsg = new MessageWritable(kmerSize);
+        if(outgoingMsg == null)
+            outgoingMsg = new MessageWritable(kmerSize);
+        else
+            outgoingMsg.reset(kmerSize);
+        if(incomingEdgeList == null)
+            incomingEdgeList = new KmerListWritable(kmerSize);
+        if(outgoingEdgeList == null)
+            outgoingEdgeList = new KmerListWritable(kmerSize);
+        if(createdVertexId == null)
+            createdVertexId = new KmerBytesWritable(kmerSize + 1);
+    }
+    
+    /**
+     * Generate random string from [ACGT]
+     */
+    public String generaterRandomString(int n){
+        char[] chars = "ACGT".toCharArray();
+        StringBuilder sb = new StringBuilder();
+        Random random = new Random();
+        while(true){
+            for (int i = 0; i < n; i++) {
+                char c = chars[random.nextInt(chars.length)];
+                sb.append(c);
+            }
+            if(!existKmerString.contains(sb.toString()))
+                break;
+        }
+        existKmerString.add(sb.toString());
+        return sb.toString();
+    }
+    
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
+        initVertex();
         if(getSuperstep() == 1){
             if(getVertexValue().getDegree() > 2){
                 outgoingMsg.setSourceVertexId(getVertexId());
@@ -167,7 +216,7 @@
                         //set outgoingEdge readId intersection
                         outgoingEdgeIntersection = selfReadIdSet;
                         outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
-                        outgoingEdgeIntersection.removeAll(neighborEdgeIntersection);
+                        outgoingEdgeIntersection.removeAll(neighborEdgeIntersection); 
                         //set incomingEdge readId intersection
                         incomingEdgeIntersection = selfReadIdSet;
                         incomingEdgeIntersection.retainAll(incomingReadIdSet);
@@ -175,13 +224,15 @@
                         
                         if(!neighborEdgeIntersection.isEmpty()){
                             createdVertex.clear();
-                            createdVertex.setCreatedVertexId(getVertexId());
+                            createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+                            createdVertex.setCreatedVertexId(createdVertexId);
                             createdVertex.setIncomingDir(connectedTable[i][1]);
                             createdVertex.setOutgoingDir(connectedTable[i][0]);
                             createdVertex.setIncomingEdge(incomingEdge);
                             createdVertex.setOutgoingEdge(outgoingEdge);
                             createdVertexSet.add(createdVertex);
                             
+                            outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
                             outgoingMsg.setSourceVertexId(getVertexId());
                             outgoingMsg.setFlag(incomingEdgeDir);
                             sendMsg(incomingEdge, outgoingMsg);
@@ -191,11 +242,13 @@
                         
                         if(!incomingEdgeIntersection.isEmpty()){
                             createdVertex.clear();
-                            createdVertex.setCreatedVertexId(getVertexId());
+                            createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+                            createdVertex.setCreatedVertexId(createdVertexId);
                             createdVertex.setIncomingDir(connectedTable[i][1]);
                             createdVertex.setIncomingEdge(incomingEdge);
                             createdVertexSet.add(createdVertex);
                             
+                            outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
                             outgoingMsg.setSourceVertexId(getVertexId());
                             outgoingMsg.setFlag(incomingEdgeDir);
                             sendMsg(incomingEdge, outgoingMsg);
@@ -203,11 +256,13 @@
                         
                         if(!outgoingEdgeIntersection.isEmpty()){
                             createdVertex.clear();
-                            createdVertex.setCreatedVertexId(getVertexId());
+                            createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
+                            createdVertex.setCreatedVertexId(createdVertexId);
                             createdVertex.setOutgoingDir(connectedTable[i][0]);
                             createdVertex.setOutgoingEdge(outgoingEdge);
                             createdVertexSet.add(createdVertex);
                             
+                            outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
                             outgoingMsg.setSourceVertexId(getVertexId());
                             outgoingMsg.setFlag(outgoingEdgeDir);
                             sendMsg(outgoingEdge, outgoingMsg);
@@ -216,7 +271,53 @@
                 }
             }
         } else if(getSuperstep() == 4){
-            
+            while(msgIterator.hasNext()){
+                incomingMsg = msgIterator.next();
+                /** update edgelist to new/created vertex **/
+                switch(incomingMsg.getFlag()){
+                    case MessageFlag.DIR_FF:
+                        getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+                        getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+                        break;
+                    case MessageFlag.DIR_FR:
+                        getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+                        getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+                        break;
+                    case MessageFlag.DIR_RF:
+                        getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+                        getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+                        break;
+                    case MessageFlag.DIR_RR:
+                        getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+                        getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+                        break;
+                }
+                /** add new/created vertex **/
+                for(CreatedVertex v : createdVertexSet){
+                    Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+                    vertex.getMsgList().clear();
+                    vertex.getEdges().clear();
+                    VertexValueWritable vertexValue = new VertexValueWritable();
+                    switch(v.incomingDir){
+                        case "RF":
+                            vertexValue.getRFList().append(v.incomingEdge);
+                            break;
+                        case "RR":
+                            vertexValue.getRRList().append(v.incomingEdge);
+                            break;
+                    }
+                    switch(v.outgoingDir){
+                        case "FF":
+                            vertexValue.getFFList().append(v.outgoingEdge);
+                            break;
+                        case "FR":
+                            vertexValue.getFRList().append(v.outgoingEdge);
+                            break;
+                    }
+                    vertex.setVertexId(v.getCreatedVertexId());
+                    vertex.setVertexValue(vertexValue);
+                }
+            }
         }
     }
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 1dae62e..630b5aa 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -15,9 +15,10 @@
 import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
-import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P1ForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.splitrepeat.SplitRepeatVertex;
 import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
 import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -29,13 +30,13 @@
 
 //    private static void generateNaiveAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
 //        PregelixJob job = new PregelixJob(jobName);
-//        job.setVertexClass(NaiveAlgorithmForPathMergeVertex.class);
+//        job.setVertexClass(P1ForPathMergeVertex.class);
 //        job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class); //GraphCleanInputFormat
 //        job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
 //        job.setDynamicVertexValueSize(true);
 //        job.setOutputKeyClass(PositionWritable.class);
 //        job.setOutputValueClass(VertexValueWritable.class);
-//        job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+//        job.getConfiguration().setInt(P1ForPathMergeVertex.KMER_SIZE, 3);
 //        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
 //    }
 //
@@ -111,6 +112,22 @@
     private static void genMapReduceGraph() throws IOException {
         generateMapReduceGraphJob("MapReduceGraph", outputBase + "MapReduceGraph.xml");
     }
+    
+    private static void generateSplitRepeatGraphJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(SplitRepeatVertex.class);
+        job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+        job.setVertexOutputFormatClass(GraphCleanOutputFormat.class); 
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(VertexValueWritable.class);
+        job.getConfiguration().setInt(SplitRepeatVertex.KMER_SIZE, 3);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
+    private static void genSplitRepeatGraph() throws IOException {
+        generateSplitRepeatGraphJob("SplitRepeatGraph", outputBase + "SplitRepeatGraph.xml");
+    }
 //    private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
 //        PregelixJob job = new PregelixJob(jobName);
 //        job.setVertexClass(TipAddVertex.class);
@@ -215,7 +232,7 @@
     
     public static void main(String[] args) throws IOException {
         //genNaiveAlgorithmForMergeGraph();
-        genLogAlgorithmForMergeGraph();
+//        genLogAlgorithmForMergeGraph();
         //genP3ForMergeGraph();
         //genTipAddGraph();
 //        genTipRemoveGraph();
@@ -225,6 +242,7 @@
 //        genBubbleMergeGraph();
 //        genP4ForMergeGraph();
 //        genMapReduceGraph();
+        genSplitRepeatGraph();
     }
 
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatmallTestSuite.java
new file mode 100644
index 0000000..d87281d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatmallTestSuite.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+@SuppressWarnings("deprecation")
+public class SplitRepeatmallTestSuite extends TestSuite {
+    private static final Logger LOGGER = Logger.getLogger(SplitRepeatmallTestSuite.class.getName());
+    //P4ForMergeGraph/bin/read
+    public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
+    public static final String[] TestDir = { PreFix + File.separator
+    + "SimpleTest"};
+    private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
+    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+    private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+    private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+    private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+    private static final String PATH_TO_ONLY = "src/test/resources/only_splitrepeat.txt";
+
+    public static final String HDFS_INPUTPATH = "/PathTestSet";
+
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+    private MiniDFSCluster dfsCluster;
+
+    private JobConf conf = new JobConf();
+    private int numberOfNC = 2;
+
+    public void setUp() throws Exception {
+        ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+        ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+        cleanupStores();
+        PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+        LOGGER.info("Hyracks mini-cluster started");
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        startHDFS();
+    }
+
+    private void startHDFS() throws IOException {
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        FileSystem dfs = FileSystem.get(conf);
+
+        for (String testDir : TestDir) {
+            File src = new File(testDir);
+            Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
+            dfs.mkdirs(dest);
+            //src.listFiles()
+            //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
+            for (File f : src.listFiles()) {
+                dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
+            }
+        }
+
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+        conf.writeXml(confOutput);
+        confOutput.flush();
+        confOutput.close();
+    }
+
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    /**
+     * cleanup hdfs cluster
+     */
+    private void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+    }
+
+    public void tearDown() throws Exception {
+        PregelixHyracksIntegrationUtil.deinit();
+        LOGGER.info("Hyracks mini-cluster shut down");
+        cleanupHDFS();
+    }
+
+    public static Test suite() throws Exception {
+        List<String> onlys = getFileList(PATH_TO_ONLY);
+        File testData = new File(PATH_TO_JOBS);
+        File[] queries = testData.listFiles();
+        SplitRepeatmallTestSuite testSuite = new SplitRepeatmallTestSuite();
+        testSuite.setUp();
+        boolean onlyEnabled = false;
+        FileSystem dfs = FileSystem.get(testSuite.conf);
+
+        if (onlys.size() > 0) {
+            onlyEnabled = true;
+        }
+
+        for (File qFile : queries) {
+            if (qFile.isFile()) {
+                if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+                    continue;
+                } else {
+                    for (String testPathStr : TestDir) {
+                        File testDir = new File(testPathStr);
+                        String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+                                + File.separator + "bin" + File.separator + testDir.getName();
+                        String textFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName())
+                                + File.separator + "txt" + File.separator + testDir.getName();
+                        testSuite.addTest(new BasicSmallTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile
+                                .getAbsolutePath().toString(), dfs,
+                                HDFS_INPUTPATH + File.separator + testDir.getName(), resultFileName, textFileName));
+                    }
+                }
+            }
+        }
+        return testSuite;
+    }
+
+    /**
+     * Runs the tests and collects their result in a TestResult.
+     */
+    @Override
+    public void run(TestResult result) {
+        try {
+            int testCount = countTestCases();
+            for (int i = 0; i < testCount; i++) {
+                // cleanupStores();
+                Test each = this.testAt(i);
+                if (result.shouldStop())
+                    break;
+                runTest(each, result);
+            }
+            tearDown();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+        BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+        String s = null;
+        List<String> ignores = new ArrayList<String>();
+        while ((s = reader.readLine()) != null) {
+            ignores.add(s);
+        }
+        reader.close();
+        return ignores;
+    }
+
+    private static String jobExtToResExt(String fname) {
+        int dot = fname.lastIndexOf('.');
+        return fname.substring(0, dot);
+    }
+
+    private static boolean isInList(List<String> onlys, String name) {
+        for (String only : onlys)
+            if (name.indexOf(only) >= 0)
+                return true;
+        return false;
+    }
+
+}