Update genomix-pregelix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2923 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
new file mode 100644
index 0000000..4af68fb
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LoadGraphVertex.java
@@ -0,0 +1,109 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.io.MessageWritable;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10	0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+public class LoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+	
+	private ByteWritable tmpVertexValue = new ByteWritable();
+	
+	/**
+	 * For test, in compute method, make each vertexValue shift 1 to left.
+	 * It will be modified when going forward to next step.
+	 */
+	@Override
+	public void compute(Iterator<MessageWritable> msgIterator) {
+		if(getSuperstep() == 1){
+			tmpVertexValue.set(getVertexValue().get());
+			tmpVertexValue.set((byte) (tmpVertexValue.get() << 1));
+			setVertexValue(tmpVertexValue);
+		}
+		else
+			voteToHalt();
+	 }
+	
+    /**
+     * Simple VertexWriter that supports {@link SimpleLoadGraphVertex}
+     */
+    public static class SimpleLoadGraphVertexWriter extends
+            TextVertexWriter<BytesWritable, ByteWritable, NullWritable> {
+        public SimpleLoadGraphVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<BytesWritable, ByteWritable, NullWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    /**
+     * Simple VertexOutputFormat that supports {@link SimpleLoadGraphVertex}
+     */
+    public static class SimpleLoadGraphVertexOutputFormat extends
+            TextVertexOutputFormat<BytesWritable, ByteWritable, NullWritable> {
+
+        @Override
+        public VertexWriter<BytesWritable, ByteWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleLoadGraphVertexWriter(recordWriter);
+        }
+    }
+	
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
+        job.setVertexClass(LoadGraphVertex.class);
+        job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+        Client.run(args, job);
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
new file mode 100644
index 0000000..529d429
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.pregelix;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.StringTokenizer;
+
+import edu.uci.ics.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+
+public class TestLoadGraphVertex {
+
+	/**
+	 * If running in different machines, the parameters need to be changed.
+	 * Now, this test is not completed.
+	 */
+	private static final String EXPECT_RESULT_FILE = "~/workspace/genomix-pregelix/expect/expected_result";
+	private static final String INPUT_PATHS = "~/workspace/genomix-pregelix/folder";
+	private static final String OUTPUT_PATH = "~/workspace/genomix-pregelix/tmp/pg_result"; //result
+	private static final String IP = "169.234.134.212"; 
+	private static final String PORT = "3099";
+	/**
+	 * @param args
+	 * @throws Exception 
+	 */
+	@SuppressWarnings("deprecation")
+	public static void main(String[] args) throws Exception {
+		// TODO Auto-generated method stub
+		//initiate args
+		args = new String[8];
+		args[0] = "-inputpaths"; 
+		args[1] = INPUT_PATHS;
+		args[2] = "-outputpath";
+		args[3] = OUTPUT_PATH;
+		args[4] = "-ip";
+		args[5] = IP;
+		args[6] = "-port";
+		args[7] = PORT;
+        PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
+        job.setVertexClass(LoadGraphVertex.class);
+        job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+        Client.run(args, job);
+        
+        generateExpectBinaryFile();
+        
+        //test if the actual file is the same as the expected file
+        DataInputStream actual_dis = new DataInputStream(new FileInputStream(OUTPUT_PATH + "/*"));
+        DataInputStream expected_dis = new DataInputStream(new FileInputStream(EXPECT_RESULT_FILE));
+        String actualLine, expectedLine = null;
+        StringTokenizer actualSt, expectedSt;
+		byte[] actualVertexId, expectedVertexId = null;
+		byte actualVertexValue, expectedVertexValue;
+        byte[] tmp = null;
+        while(((actualLine = actual_dis.readLine()) != null) && 
+        		((expectedLine = expected_dis.readLine()) != null)){
+        	actualSt = new StringTokenizer(actualLine, " ");
+			actualVertexId = actualSt.nextToken().getBytes();
+			tmp = actualSt.nextToken().getBytes();
+			actualVertexValue = tmp[0];
+			
+			expectedSt = new StringTokenizer(expectedLine," ");
+			expectedVertexId = expectedSt.nextToken().getBytes();
+			tmp = expectedSt.nextToken().getBytes();
+			expectedVertexValue = tmp[0];
+			
+			//assertEquals("actualVextexId == expectedVertexId", actualVertexId, expectedVertexId);
+			//assertEquals("actualVertexValue == expectedVertexValue", actualVertexValue, expectedVertexValue);
+        }
+        
+        //assertEquals("actualLine should be the end and be equal to null", actualLine, null);
+        //assertEquals("expectedLine should be the end and be equal to null", expectedLine, null);
+	}
+
+	@SuppressWarnings("deprecation")
+	public static void generateExpectBinaryFile() throws Exception{
+		DataInputStream dis = new DataInputStream(new FileInputStream(INPUT_PATHS + "/*"));
+		DataOutputStream dos = new DataOutputStream(new FileOutputStream(EXPECT_RESULT_FILE));
+		String line;
+		byte[] vertexId = null;
+		byte vertexValue;
+		byte[] tmp = null;
+		while((line = dis.readLine()) != null){
+			StringTokenizer st = new StringTokenizer(line, " ");
+			vertexId = st.nextToken().getBytes();
+			tmp = st.nextToken().getBytes();
+			vertexValue = tmp[0];		
+			
+			vertexValue = (byte) (vertexValue << 1); 
+			for(int i = 0; i < vertexId.length; i++)
+				dos.writeByte(vertexId[i]);
+			dos.writeByte((byte)32); //space
+			dos.writeByte(vertexValue);
+			dos.writeByte((byte)10); //line feed
+		}
+		
+		dis.close();
+		dos.close();
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TextLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TextLoadGraphInputFormat.java
new file mode 100644
index 0000000..b7fda73
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TextLoadGraphInputFormat.java
@@ -0,0 +1,82 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.MessageWritable;
+
+public class TextLoadGraphInputFormat extends
+		TextVertexInputFormat<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+	
+	/**
+	 * Format INPUT
+	 */
+    @Override
+    public VertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextLoadGraphReader(textInputFormat.createRecordReader(split, context));
+    }
+    
+    @SuppressWarnings("rawtypes")
+    class TextLoadGraphReader extends
+            TextVertexReader<BytesWritable, ByteWritable, NullWritable, MessageWritable> {
+        private final static String separator = " ";
+        private Vertex vertex;
+        private BytesWritable vertexId = new BytesWritable();
+        private ByteWritable vertexValue = new ByteWritable();
+
+        public TextLoadGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+            super(lineRecordReader);
+        }
+
+        @Override
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable> getCurrentVertex() throws IOException,
+                InterruptedException {
+            if (vertex == null)
+                vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+            vertex.getMsgList().clear();
+            vertex.getEdges().clear();
+            Text line = getRecordReader().getCurrentValue();
+            String[] fields = line.toString().split(separator);
+
+            if (fields.length > 0) {
+                /**
+                 * set the src vertex id
+                 */
+            	BytesWritable src = new BytesWritable(fields[0].getBytes());
+                vertexId.set(src);
+                vertex.setVertexId(vertexId);
+
+                
+                /**
+                 * set the vertex value
+                 */
+                byte[] temp = fields[1].getBytes();
+                vertexValue.set(temp[0]);
+                vertex.setVertexValue(vertexValue);
+                
+            }
+            return vertex;
+        }
+    }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
new file mode 100644
index 0000000..0dbd800
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
@@ -0,0 +1,89 @@
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class MessageWritable implements WritableComparable<MessageWritable>{
+	/**
+	 * bytes stores the chains of connected DNA
+	 * file stores the point to the file that stores the chains of connected DNA
+	 */
+	private byte[] bytes;
+	private File file;
+	
+	public MessageWritable(){		
+	}
+	
+	public MessageWritable(byte[] bytes, File file){
+		set(bytes,file);
+	}
+	
+	public void set(byte[] bytes, File file){
+		this.bytes = bytes;
+		this.file = file;
+	}
+			
+	public byte[] getBytes() {
+	    return bytes;
+	}
+	
+	public File getFile(){
+		return file;
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		// TODO Auto-generated method stub
+		out.write(bytes);
+		out.writeUTF(file.getAbsolutePath()); 
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		// TODO Auto-generated method stub
+		in.readFully(bytes);
+		String absolutePath = in.readUTF();
+		file = new File(absolutePath);
+	}
+
+    @Override
+    public int hashCode() {
+    	int hashCode = 0;
+    	for(int i = 0; i < bytes.length; i++)
+    		hashCode = (int)bytes[i];
+        return hashCode;
+    }
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof MessageWritable) {
+        	MessageWritable tp = (MessageWritable) o;
+            return bytes == tp.bytes && file == tp.file;
+        }
+        return false;
+    }
+    @Override
+    public String toString() {
+        return bytes.toString() + "\t" + file.getAbsolutePath();
+    }
+    
+	@Override
+	public int compareTo(MessageWritable tp) {
+		// TODO Auto-generated method stub
+        int cmp;
+        if (bytes == tp.bytes)
+            cmp = 0;
+        else
+            cmp = 1;
+        if (cmp != 0)
+            return cmp;
+        if (file == tp.file)
+            return 0;
+        else
+            return 1;
+	}
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
new file mode 100644
index 0000000..2af1688
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -0,0 +1,48 @@
+package edu.uci.ics.pregelix.JobGen;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.pregelix.LoadGraphVertex;
+import edu.uci.ics.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
+import edu.uci.ics.pregelix.TextLoadGraphInputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+
+public class JobGenerator {
+
+    private static String outputBase = "src/test/resources/jobs/";
+    private static String HDFS_INPUTPATH = "/webmap";
+    private static String HDFS_OUTPUTPAH = "/result";
+    
+    private static void generateLoadGraphJob(String jobName, String outputPath) throws IOException {
+    	PregelixJob job = new PregelixJob(jobName);
+    	job.setVertexClass(LoadGraphVertex.class);
+    	job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+    
+    private static void genLoadGraph() throws IOException {
+    	generateLoadGraphJob("LoadGraph", outputBase + "LoadGraph.xml");
+    }
+    
+	/**
+	 * @param args
+	 * @throws IOException 
+	 */
+	public static void main(String[] args) throws IOException {
+		// TODO Auto-generated method stub
+
+		genLoadGraph();
+	}
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
new file mode 100644
index 0000000..3928d4f
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
@@ -0,0 +1,160 @@
+package edu.uci.ics.pregelix.JobRun;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+public class RunJobTestCase extends TestCase{
+	
+	private static final String NC1 = "nc1";
+    private static final String HYRACKS_APP_NAME = "pregelix";
+    private static String HDFS_INPUTPATH = "/webmap";
+    private static String HDFS_OUTPUTPAH = "/result";
+    
+    private final PregelixJob job;
+    private JobGen[] giraphJobGens;
+    private final String resultFileName;
+    private final String expectedFileName;
+    private final String jobFile;
+
+    public RunJobTestCase(String hadoopConfPath, String jobName, String jobFile, String resultFile, String expectedFile)
+            throws Exception {
+        super("test");
+        this.jobFile = jobFile;
+        this.job = new PregelixJob("test");
+        this.job.getConfiguration().addResource(new Path(jobFile));
+        this.job.getConfiguration().addResource(new Path(hadoopConfPath));
+        Path[] inputPaths = FileInputFormat.getInputPaths(job);
+        if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH)) {
+            FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        } 
+        
+        job.setJobName(jobName);
+        this.resultFileName = resultFile;
+        this.expectedFileName = expectedFile;
+        giraphJobGens = new JobGen[4];
+        giraphJobGens[0] = new JobGenOuterJoin(job);
+        waitawhile();
+        giraphJobGens[1] = new JobGenInnerJoin(job);
+        waitawhile();
+        giraphJobGens[2] = new JobGenOuterJoinSort(job);
+        waitawhile();
+        giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
+    }
+    
+    private void waitawhile() throws InterruptedException {
+        synchronized (this) {
+            this.wait(20);
+        }
+    }
+	@Test
+	public void test() throws Exception {
+		setUp();
+        for (JobGen jobGen : giraphJobGens) {
+            FileSystem dfs = FileSystem.get(job.getConfiguration());
+            dfs.delete(new Path(HDFS_OUTPUTPAH), true);
+            runCreate(jobGen);
+            runDataLoad(jobGen);
+            int i = 1;
+            boolean terminate = false;
+            do {
+                runLoopBodyIteration(jobGen, i);
+                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+                i++;
+            } while (!terminate);
+            runIndexScan(jobGen);
+            runHDFSWRite(jobGen);
+            runCleanup(jobGen);
+            compareResults();
+        }
+        tearDown();
+        waitawhile();
+	}
+	
+	private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification treeCreateJobSpec = jobGen.generateCreatingJob();
+            PregelixHyracksIntegrationUtil.runJob(treeCreateJobSpec, HYRACKS_APP_NAME);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runDataLoad(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+            PregelixHyracksIntegrationUtil.runJob(bulkLoadJobSpec, HYRACKS_APP_NAME);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+        try {
+            JobSpecification loopBody = jobGen.generateJob(iteration);
+            PregelixHyracksIntegrationUtil.runJob(loopBody, HYRACKS_APP_NAME);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runIndexScan(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification scanSortPrintJobSpec = jobGen.scanIndexPrintGraph(NC1, resultFileName);
+            PregelixHyracksIntegrationUtil.runJob(scanSortPrintJobSpec, HYRACKS_APP_NAME);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runHDFSWRite(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
+            PregelixHyracksIntegrationUtil.runJob(scanSortPrintJobSpec, HYRACKS_APP_NAME);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runCleanup(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification[] cleanups = jobGen.generateCleanup();
+            runJobArray(cleanups);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void runJobArray(JobSpecification[] jobs) throws Exception {
+        for (JobSpecification job : jobs) {
+            PregelixHyracksIntegrationUtil.runJob(job, HYRACKS_APP_NAME);
+        }
+    }
+
+    private void compareResults() throws Exception {
+        TestUtils.compareWithResult(new File(resultFileName), new File(expectedFileName));
+    }
+
+    public String toString() {
+        return jobFile;
+    }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
new file mode 100644
index 0000000..53c2d48
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
@@ -0,0 +1,186 @@
+package edu.uci.ics.pregelix.JobRun;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+
+public class RunJobTestSuite extends TestSuite{
+	
+	private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
+			.getName());
+
+	private static final String ACTUAL_RESULT_DIR = "actual";
+	private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
+	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+	private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+	private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+	private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+	private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
+	private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+	private static final String FILE_EXTENSION_OF_RESULTS = "result";
+
+	private static final String DATA_PATH = "data/webmap/test.dat";
+	private static final String HDFS_PATH = "/webmap/";
+	
+	private static final String HYRACKS_APP_NAME = "pregelix";
+	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+			+ File.separator + "conf.xml";
+	private MiniDFSCluster dfsCluster;
+
+	private JobConf conf = new JobConf();
+	private int numberOfNC = 2;
+	
+	public void setUp() throws Exception {
+		ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+		ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+		cleanupStores();
+		PregelixHyracksIntegrationUtil.init();
+		PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+		LOGGER.info("Hyracks mini-cluster started");
+		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+		startHDFS();
+	}
+
+	private void cleanupStores() throws IOException {
+		FileUtils.forceMkdir(new File("teststore"));
+		FileUtils.forceMkdir(new File("build"));
+		FileUtils.cleanDirectory(new File("teststore"));
+		FileUtils.cleanDirectory(new File("build"));
+	}
+	
+	private void startHDFS() throws IOException {
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+		FileSystem lfs = FileSystem.getLocal(new Configuration());
+		lfs.delete(new Path("build"), true);
+		System.setProperty("hadoop.log.dir", "logs");
+		dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+		FileSystem dfs = FileSystem.get(conf);
+		Path src = new Path(DATA_PATH);
+		Path dest = new Path(HDFS_PATH);
+		dfs.mkdirs(dest);
+		dfs.copyFromLocalFile(src, dest);
+
+		DataOutputStream confOutput = new DataOutputStream(
+				new FileOutputStream(new File(HADOOP_CONF_PATH)));
+		conf.writeXml(confOutput);
+		confOutput.flush();
+		confOutput.close();
+	}
+	
+	/**
+	 * cleanup hdfs cluster
+	 */
+	private void cleanupHDFS() throws Exception {
+		dfsCluster.shutdown();
+	}
+
+	public void tearDown() throws Exception {
+		PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+		PregelixHyracksIntegrationUtil.deinit();
+		LOGGER.info("Hyracks mini-cluster shut down");
+		cleanupHDFS();
+	}
+	
+	public static Test suite() throws Exception {
+		List<String> ignores = getFileList(PATH_TO_IGNORE);
+		List<String> onlys = getFileList(PATH_TO_ONLY);
+		File testData = new File(PATH_TO_JOBS);
+		File[] queries = testData.listFiles();
+		RunJobTestSuite testSuite = new RunJobTestSuite();
+		testSuite.setUp();
+		boolean onlyEnabled = false;
+
+		if (onlys.size() > 0) {
+			onlyEnabled = true;
+		}
+		for (File qFile : queries) {
+			if (isInList(ignores, qFile.getName()))
+				continue;
+
+			if (qFile.isFile()) {
+				if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+					continue;
+				} else {
+					String resultFileName = ACTUAL_RESULT_DIR + File.separator
+							+ jobExtToResExt(qFile.getName());
+					String expectedFileName = EXPECTED_RESULT_DIR
+							+ File.separator + jobExtToResExt(qFile.getName());
+					testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
+							qFile.getName(),
+							qFile.getAbsolutePath().toString(), resultFileName,
+							expectedFileName));
+				}
+			}
+		}
+		return testSuite;
+	}
+	
+	/**
+	 * Runs the tests and collects their result in a TestResult.
+	 */
+	@Override
+	public void run(TestResult result) {
+		try {
+			int testCount = countTestCases();
+			for (int i = 0; i < testCount; i++) {
+				// cleanupStores();
+				Test each = this.testAt(i);
+				if (result.shouldStop())
+					break;
+				runTest(each, result);
+			}
+			tearDown();
+		} catch (Exception e) {
+			throw new IllegalStateException(e);
+		}
+	}
+
+	protected static List<String> getFileList(String ignorePath)
+			throws FileNotFoundException, IOException {
+		BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+		String s = null;
+		List<String> ignores = new ArrayList<String>();
+		while ((s = reader.readLine()) != null) {
+			ignores.add(s);
+		}
+		reader.close();
+		return ignores;
+	}
+
+	private static String jobExtToResExt(String fname) {
+		int dot = fname.lastIndexOf('.');
+		return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+	}
+
+	private static boolean isInList(List<String> onlys, String name) {
+		for (String only : onlys)
+			if (name.indexOf(only) >= 0)
+				return true;
+		return false;
+	}
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/LoadGraphVertexTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/LoadGraphVertexTest.java
new file mode 100644
index 0000000..e88098d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/LoadGraphVertexTest.java
@@ -0,0 +1,108 @@
+package edu.uci.ics.pregelix;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+
+public class LoadGraphVertexTest {
+
+	/**
+	 * I can't debug in JUnits test so that I can't find my error here. So I leave comments here.
+	 * I will figure out as soon as possible.
+	 */
+	private static final String EXPECT_RESULT_FILE = "expected_result";
+	private static final String INPUT_PATHS = "folder";
+	private static final String OUTPUT_PATH = "result";
+	private static final String IP = "169.234.134.212";
+	private static final String PORT = "3099";
+	
+	@SuppressWarnings("deprecation")
+	@Test
+	public void test() throws Exception {
+		//initiate args
+	/*	String[] args = new String[8];
+		args[0] = "-inputpaths"; 
+		args[1] = INPUT_PATHS;
+		args[2] = "-outputpath";
+		args[3] = OUTPUT_PATH;
+		args[4] = "-ip";
+		args[5] = IP;
+		args[6] = "-port";
+		args[7] = PORT;
+        PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
+        job.setVertexClass(LoadGraphVertex.class);
+        job.setVertexInputFormatClass(TextLoadGraphInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleLoadGraphVertexOutputFormat.class);
+        Client.run(args, job);
+        
+        generateExpectBinaryFile();
+        
+        //test if the actual file is the same as the expected file
+        DataInputStream actual_dis = new DataInputStream(new FileInputStream(OUTPUT_PATH + "/*"));
+        DataInputStream expected_dis = new DataInputStream(new FileInputStream(EXPECT_RESULT_FILE));
+        String actualLine, expectedLine = null;
+        StringTokenizer actualSt, expectedSt;
+		byte[] actualVertexId, expectedVertexId = null;
+		byte actualVertexValue, expectedVertexValue;
+        byte[] tmp = null;
+        while(((actualLine = actual_dis.readLine()) != null) && 
+        		((expectedLine = expected_dis.readLine()) != null)){
+        	actualSt = new StringTokenizer(actualLine, " ");
+			actualVertexId = actualSt.nextToken().getBytes();
+			tmp = actualSt.nextToken().getBytes();
+			actualVertexValue = tmp[0];
+			
+			expectedSt = new StringTokenizer(expectedLine," ");
+			expectedVertexId = expectedSt.nextToken().getBytes();
+			tmp = expectedSt.nextToken().getBytes();
+			expectedVertexValue = tmp[0];
+			
+			assertEquals("actualVextexId == expectedVertexId", actualVertexId, expectedVertexId);
+			assertEquals("actualVertexValue == expectedVertexValue", actualVertexValue, expectedVertexValue);
+        }
+        
+        assertEquals("actualLine should be the end and be equal to null", actualLine, null);
+        assertEquals("expectedLine should be the end and be equal to null", expectedLine, null);*/
+	}
+	
+	@SuppressWarnings("deprecation")
+	public void generateExpectBinaryFile() throws Exception{
+		DataInputStream dis = new DataInputStream(new FileInputStream(INPUT_PATHS + "/*"));
+		DataOutputStream dos = new DataOutputStream(new FileOutputStream(EXPECT_RESULT_FILE));
+		String line;
+		byte[] vertexId = null;
+		byte vertexValue;
+		byte[] tmp = null;
+		while((line = dis.readLine()) != null){
+			StringTokenizer st = new StringTokenizer(line, " ");
+			vertexId = st.nextToken().getBytes();
+			tmp = st.nextToken().getBytes();
+			vertexValue = tmp[0];		
+			
+			vertexValue = (byte) (vertexValue << 1); 
+			for(int i = 0; i < vertexId.length; i++)
+				dos.writeByte(vertexId[i]);
+			dos.writeByte((byte)32); //space
+			dos.writeByte(vertexValue);
+			dos.writeByte((byte)10); //line feed
+		}
+		
+		dis.close();
+		dos.close();
+	}
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
new file mode 100644
index 0000000..d89ec46
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+public class TestUtils {
+
+    public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+        BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+        BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+        String lineExpected, lineActual;
+        int num = 1;
+        try {
+            while ((lineExpected = readerExpected.readLine()) != null) {
+                lineActual = readerActual.readLine();
+                // Assert.assertEquals(lineExpected, lineActual);
+                if (lineActual == null) {
+                    throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+                }
+                if (!equalStrings(lineExpected, lineActual)) {
+                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                            + lineActual);
+                }
+                ++num;
+            }
+            lineActual = readerActual.readLine();
+            if (lineActual != null) {
+                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+            }
+        } finally {
+            readerExpected.close();
+            readerActual.close();
+        }
+    }
+
+    private static boolean equalStrings(String s1, String s2) {
+        String[] rowsOne = s1.split("\n");
+        String[] rowsTwo = s2.split("\n");
+
+        if (rowsOne.length != rowsTwo.length)
+            return false;
+
+        for (int i = 0; i < rowsOne.length; i++) {
+            String row1 = rowsOne[i];
+            String row2 = rowsTwo[i];
+
+            if (row1.equals(row2))
+                continue;
+
+            String[] fields1 = row1.split(" ");
+            String[] fields2 = row2.split(" ");
+
+            for (int j = 0; j < fields1.length; j++) {
+                if (fields1[j].equals(fields2[j])) {
+                    continue;
+                } else if (fields1[j].indexOf('.') < 0) {
+                    return false;
+                } else {
+                    Double double1 = Double.parseDouble(fields1[j]);
+                    Double double2 = Double.parseDouble(fields2[j]);
+                    float float1 = (float) double1.doubleValue();
+                    float float2 = (float) double2.doubleValue();
+
+                    if (Math.abs(float1 - float2) == 0)
+                        continue;
+                    else {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties b/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
new file mode 100644
index 0000000..14f8bd4
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
@@ -0,0 +1,37 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME=../../../../hyracks
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties b/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
new file mode 100644
index 0000000..daf881e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
@@ -0,0 +1 @@
+store=teststore
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/expected/LoadGraph.result b/genomix/genomix-pregelix/src/test/resources/expected/LoadGraph.result
new file mode 100644
index 0000000..b96a242
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/expected/LoadGraph.result
@@ -0,0 +1,4 @@
+06|Vertex(id=06,value=34, edges=())
+07|Vertex(id=07,value=68, edges=())
+1b|Vertex(id=1b,value=-120, edges=())
+2d|Vertex(id=2d,value=-34, edges=())
diff --git a/genomix/genomix-pregelix/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..47dfac5
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+    <name>fs.default.name</name>
+    <value>hdfs://127.0.0.1:31888</value>
+</property>
+<property>
+    <name>hadoop.tmp.dir</name>
+    <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/genomix/genomix-pregelix/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..8d29b1d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+   <name>dfs.replication</name>
+   <value>1</value>
+</property>
+
+<property>
+	<name>dfs.block.size</name>
+	<value>65536</value>
+</property>
+
+</configuration>
diff --git a/genomix/genomix-pregelix/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-pregelix/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..71450f1
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+  <property>
+    <name>mapred.job.tracker</name>
+    <value>localhost:29007</value>
+  </property>
+  <property>
+     <name>mapred.tasktracker.map.tasks.maximum</name>
+     <value>20</value>
+  </property>
+   <property>
+      <name>mapred.tasktracker.reduce.tasks.maximum</name>
+      <value>20</value>
+   </property>
+   <property>
+      <name>mapred.max.split.size</name>
+      <value>128</value>
+   </property>
+
+</configuration>
diff --git a/genomix/genomix-pregelix/src/test/resources/hyracks-deployment.properties b/genomix/genomix-pregelix/src/test/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..9c42b89
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/hyracks-deployment.properties
@@ -0,0 +1,2 @@
+#cc.bootstrap.class=edu.uci.ics.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
diff --git a/genomix/genomix-pregelix/src/test/resources/ignore.txt b/genomix/genomix-pregelix/src/test/resources/ignore.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/ignore.txt
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/LoadGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/LoadGraph.xml
new file mode 100644
index 0000000..7663f88
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/jobs/LoadGraph.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>LoadGraph</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.LoadGraphVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.LoadGraphVertex$SimpleLoadGraphVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.TextLoadGraphInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/log4j.properties b/genomix/genomix-pregelix/src/test/resources/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-pregelix/src/test/resources/logging.properties b/genomix/genomix-pregelix/src/test/resources/logging.properties
new file mode 100644
index 0000000..b8f2be9
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/logging.properties
@@ -0,0 +1,66 @@
+############################################################
+#  	Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.  
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+#  	Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler 
+# classes.  These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+
+# handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers.  For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+
+.level= SEVERE
+# .level= INFO
+# .level= FINE
+# .level = FINEST
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+
+# java.util.logging.FileHandler.pattern = %h/java%u.log
+# java.util.logging.FileHandler.limit = 50000
+# java.util.logging.FileHandler.count = 1
+# java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+
+# Limit the message that are printed on the console to FINE and above.
+
+java.util.logging.ConsoleHandler.level = FINEST
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
+# For example, set the com.xyz.foo logger to only log SEVERE
+# messages:
+
+#edu.uci.ics.asterix.level = FINE
+#edu.uci.ics.algebricks.level = FINE
+edu.uci.ics.hyracks.level = SEVERE
+#edu.uci.ics.hyracks.control.nc.net.level = FINE
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/only.txt b/genomix/genomix-pregelix/src/test/resources/only.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/only.txt