finished Driver for genomix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2858 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/pom.xml b/genomix/genomix-core/pom.xml
index b88c75d..4cd78c8 100644
--- a/genomix/genomix-core/pom.xml
+++ b/genomix/genomix-core/pom.xml
@@ -216,5 +216,27 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-core</artifactId>
+			<version>0.20.2</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-test</artifactId>
+			<version>0.20.2</version>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+            <groupId>edu.uci.ics.hyracks</groupId>
+            <artifactId>hyracks-hdfs</artifactId>
+            <version>0.2.3-SNAPSHOT</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 	</dependencies>
 </project>
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 1a3b927..3f0f194 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -6,6 +6,8 @@
 import java.io.InputStreamReader;

 import java.nio.ByteBuffer;

 

+import org.apache.hadoop.fs.Path;

+

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

@@ -13,6 +15,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

@@ -23,14 +26,14 @@
 

     private static final long serialVersionUID = 1L;

     private int k;

+    private Path [] filesplit = null ;

+    private String pathSurfix ;

     private int byteNum;

-    private String filename;

 

-    public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String filename) {

+    public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String path) {

         super(spec, 0, 1);

-        // TODO Auto-generated constructor stub

         this.k = k;

-        this.filename = filename;

+        this.pathSurfix = path;

         

         byteNum = (byte)Math.ceil((double)k/4.0);

         //recordDescriptors[0] = news RecordDescriptor(

@@ -39,7 +42,19 @@
                 null, null});

     }

 

-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

+    public FileScanDescriptor(JobSpecification jobSpec, int kmers,

+			Path[] inputPaths) {

+    	super(jobSpec, 0, 1);

+        this.k = k;

+        this.filesplit = inputPaths;

+        this.pathSurfix = inputPaths[0].toString();

+        //recordDescriptors[0] = news RecordDescriptor(

+        //		new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });

+        recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

+                null, ByteSerializerDeserializer.INSTANCE });

+	}

+

+	public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {

 

         final int temp = partition;

@@ -60,7 +75,7 @@
                 outputAppender.reset(outputBuffer, true);

                 try {// one try with multiple catch?

                     writer.open();

-                    String s = filename + String.valueOf(temp);

+                    String s = pathSurfix + String.valueOf(temp);

                     

                     File tf = new File(s);

                     

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 06bde29..0b0aa07 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -184,8 +184,8 @@
                 //ByteSerializerDeserializer.INSTANCE });

 

        int[] keyFields = new int[] { 0 };

-        int frameLimits = 4096;

-        int tableSize = 10485767;

+        int frameLimits = 4096;		// hyracks oriented

+        int tableSize = 10485767;	// hyracks oriented

 

         AbstractOperatorDescriptor single_grouper;

         IConnectorDescriptor conn_partition;

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
new file mode 100644
index 0000000..93137d7
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -0,0 +1,111 @@
+package edu.uci.ics.genomix.driver;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.job.JobGen;
+import edu.uci.ics.genomix.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.job.JobGenContigsGeneration;
+import edu.uci.ics.genomix.job.JobGenGraphCleanning;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class Driver {
+   public static enum Plan {
+        BUILD_DEBRUJIN_GRAPH,
+        GRAPH_CLEANNING,
+        CONTIGS_GENERATION,
+    }
+   
+   	private static final String applicationName = "genomix";
+   	private static final Log LOG = LogFactory.getLog(Driver.class);
+    private JobGen jobGen;
+    private boolean profiling;
+    
+    private int numPartitionPerMachine;
+    
+    private IHyracksClientConnection hcc;
+    
+    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException{
+    	try{
+    		hcc = new HyracksConnection(ipAddress, port);
+    	} catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    	this.numPartitionPerMachine = numPartitionPerMachine;
+    }
+	
+    public void runJob(GenomixJob job) throws HyracksException {
+        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+    }
+
+    public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
+            throws HyracksException {
+        /** add hadoop configurations */
+    	//TODO need to include the hadoophome to the classpath in the way below
+        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+        job.getConfiguration().addResource(hadoopCore);
+        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+        job.getConfiguration().addResource(hadoopMapRed);
+        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+        job.getConfiguration().addResource(hadoopHdfs);
+
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
+
+        this.profiling = profiling;
+        try {
+    		Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+            switch (planChoice) {
+                case BUILD_DEBRUJIN_GRAPH:default:
+                    jobGen = new JobGenBrujinGraph(job, ncMap, numPartitionPerMachine);
+                    break;
+                case GRAPH_CLEANNING:
+                    jobGen = new JobGenGraphCleanning(job);
+                    break;
+                case CONTIGS_GENERATION:
+                    jobGen = new JobGenContigsGeneration(job);
+                    break;
+            }
+            
+            start = System.currentTimeMillis();
+            runCreate(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+    
+    private void runCreate(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification createJob = jobGen.generateJob();
+            execute(createJob);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void execute(JobSpecification job) throws Exception {
+        job.setUseConnectorPolicyForScheduling(false);
+        JobId jobId = hcc.startJob(applicationName, job,
+                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
new file mode 100644
index 0000000..24a7561
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -0,0 +1,59 @@
+package edu.uci.ics.genomix.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+public class GenomixJob extends Job {
+
+	/** Kmers length */
+	public static final String KMER_LENGTH = "genomix.kmer";
+	/** Frame Size */
+	public static final String FRAME_SIZE = "genomix.framesize";
+	/** Frame Limit, hyracks need */
+	public static final String FRAME_LIMIT = "genomix.framelimit";
+	/** Table Size, hyracks need */
+	public static final String TABLE_SIZE = "genomix.tablesize";
+	/** Groupby types ? */
+	public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+	
+
+	/** Configurations used by hybrid groupby function in graph build phrase */
+	public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+	public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+	public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+	public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+	public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+	public GenomixJob(String jobname) throws IOException {
+		super(new Configuration(), jobname);
+	}
+
+	public GenomixJob(Configuration conf, String jobName) throws IOException {
+		super(conf, jobName);
+	}
+
+	/**
+	 * Set the kmer length
+	 * 
+	 * @param the
+	 *            desired frame size
+	 */
+	final public void setKmerLength(int kmerlength) {
+		getConfiguration().setInt(KMER_LENGTH, kmerlength);
+	}
+
+	final public void setFrameSize(int frameSize) {
+		getConfiguration().setInt(FRAME_SIZE, frameSize);
+	}
+
+	final public void setFrameLimit(int frameLimit) {
+		getConfiguration().setInt(FRAME_LIMIT, frameLimit);
+	}
+
+	final public void setTableSize(int tableSize) {
+		getConfiguration().setInt(TABLE_SIZE, tableSize);
+	}
+
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
new file mode 100644
index 0000000..66bf79d
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.genomix.job;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public abstract class JobGen {
+	
+	protected final Configuration conf;
+	protected final GenomixJob genomixJob;
+	protected String jobId = new UUID(System.currentTimeMillis(),
+			System.nanoTime()).toString();
+
+	public JobGen(GenomixJob job) {
+		this.conf = job.getConfiguration();
+		this.genomixJob = job;
+		this.initJobConfiguration();
+	}
+
+	protected abstract void initJobConfiguration();
+
+	public abstract JobSpecification generateJob () throws HyracksDataException;
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..067ad37
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -0,0 +1,230 @@
+package edu.uci.ics.genomix.job;
+
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.dataflow.FileScanDescriptor;
+import edu.uci.ics.genomix.dataflow.PrinterOperatorDescriptor;
+import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
+import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+
+public class JobGenBrujinGraph extends JobGen {
+	public enum GroupbyType {
+		EXTERNAL, PRECLUSTER, HYBRIDHASH,
+	}
+
+	private final Map<String, NodeControllerInfo> ncMap;
+	private String [] ncNodeNames;
+	
+	private int kmers;
+	private int frameLimits;
+	private int tableSize;
+	private Path[] inputPaths;
+	private Path outputPath;
+	private GroupbyType groupbyType;
+
+	private AbstractOperatorDescriptor singleGrouper;
+	private IConnectorDescriptor connPartition;
+	private AbstractOperatorDescriptor crossGrouper;
+	private RecordDescriptor outputRec;
+
+	public JobGenBrujinGraph(GenomixJob job,
+			final Map<String, NodeControllerInfo> ncMap, int numPartitionPerMachine) {
+		super(job);
+		this.ncMap = ncMap;
+		String [] nodes = new String[ncMap.size()];
+		ncMap.keySet().toArray(nodes);
+		ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+		for (int i = 0; i < numPartitionPerMachine; i++){
+			System.arraycopy(nodes, 0, ncNodeNames, i*nodes.length, nodes.length);
+		}
+	}
+
+	private ExternalGroupOperatorDescriptor newExternalGroupby(
+			JobSpecification jobSpec, int[] keyFields, IAggregatorDescriptorFactory aggeragater) {
+		return new ExternalGroupOperatorDescriptor(
+				jobSpec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+						.of(LongPointable.FACTORY) },
+				new Integer64NormalizedKeyComputerFactory(),
+				aggeragater,
+				new DistributedMergeLmerAggregateFactory(),
+				outputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+										.of(LongPointable.FACTORY) }),
+						tableSize), true);
+	}
+
+	private HybridHashGroupOperatorDescriptor newHybridGroupby(
+			JobSpecification jobSpec, int[] keyFields,
+			long inputSizeInRawRecords, long inputSizeInUniqueKeys,
+			int recordSizeInBytes, int hashfuncStartLevel)
+			throws HyracksDataException {
+		return new HybridHashGroupOperatorDescriptor(
+				jobSpec,
+				keyFields,
+				frameLimits,
+				inputSizeInRawRecords,
+				inputSizeInUniqueKeys,
+				recordSizeInBytes,
+				tableSize,
+				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+						.of(LongPointable.FACTORY) },
+				new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },
+				// new IBinaryHashFunctionFamily[]
+				// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+				hashfuncStartLevel,
+				new Integer64NormalizedKeyComputerFactory(),
+				new MergeKmerAggregateFactory(),
+				new DistributedMergeLmerAggregateFactory(), outputRec, true);
+	}
+
+	private void generateDescriptorbyType(JobSpecification jobSpec)
+			throws HyracksDataException {
+		int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+		outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+				Integer64SerializerDeserializer.INSTANCE,
+				ByteSerializerDeserializer.INSTANCE,
+				ByteSerializerDeserializer.INSTANCE });
+		switch (groupbyType) {
+		case EXTERNAL:
+			singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
+					new KmerHashPartitioncomputerFactory());
+			crossGrouper = newExternalGroupby(jobSpec, keyFields,new DistributedMergeLmerAggregateFactory());
+			break;
+		case PRECLUSTER:
+			singleGrouper = newExternalGroupby(jobSpec, keyFields,new MergeKmerAggregateFactory());
+			connPartition = new MToNPartitioningMergingConnectorDescriptor(
+					jobSpec,
+					new KmerHashPartitioncomputerFactory(),
+					keyFields,
+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+							.of(LongPointable.FACTORY) });
+			crossGrouper = new PreclusteredGroupOperatorDescriptor(
+					jobSpec,
+					keyFields,
+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+							.of(LongPointable.FACTORY) },
+					new DistributedMergeLmerAggregateFactory(), outputRec);
+			break;
+		case HYBRIDHASH:
+		default:
+			long inputSizeInRawRecords = conf.getLong(
+					GenomixJob.GROUPBY_HYBRID_INPUTSIZE, 154000000);
+			long inputSizeInUniqueKeys = conf.getLong(
+					GenomixJob.GROUPBY_HYBRID_INPUTKEYS, 38500000);
+			int recordSizeInBytes = conf.getInt(
+					GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE, 9);
+			int hashfuncStartLevel = conf.getInt(
+					GenomixJob.GROUPBY_HYBRID_HASHLEVEL, 1);
+
+			singleGrouper = newHybridGroupby(jobSpec, keyFields,
+					inputSizeInRawRecords, inputSizeInUniqueKeys,
+					recordSizeInBytes, hashfuncStartLevel);
+			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
+					new KmerHashPartitioncomputerFactory());
+			/** here read the different recordSize why ? */
+			recordSizeInBytes = conf.getInt(
+					GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS, 13);
+			crossGrouper = newHybridGroupby(jobSpec, keyFields,
+					inputSizeInRawRecords, inputSizeInUniqueKeys,
+					recordSizeInBytes, hashfuncStartLevel);
+			break;
+		}
+	}
+
+	@Override
+	public JobSpecification generateJob() throws HyracksDataException {
+		
+		JobSpecification jobSpec = new JobSpecification();
+		//File input
+		FileScanDescriptor scan = new FileScanDescriptor(jobSpec, kmers, inputPaths);		
+		
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, scan,ncNodeNames);
+		
+		generateDescriptorbyType(jobSpec);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				singleGrouper, ncNodeNames);
+
+		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+				jobSpec);
+		jobSpec.connect(readfileConn, scan, 0, singleGrouper, 0);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				crossGrouper, ncNodeNames);
+		jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+
+		//Output
+		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(jobSpec,
+				outputPath.getName());
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, printer,
+				ncNodeNames);
+
+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+		jobSpec.connect(printConn, crossGrouper, 0, printer, 0);
+		jobSpec.addRoot(printer);
+
+		if (groupbyType == GroupbyType.PRECLUSTER) {
+			jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+		}
+		return jobSpec;
+	}
+
+	@Override
+	protected void initJobConfiguration() {
+		kmers = conf.getInt(GenomixJob.KMER_LENGTH, 25);
+		frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, 4096);
+		tableSize = conf.getInt(GenomixJob.TABLE_SIZE, 10485767);
+		inputPaths = FileInputFormat.getInputPaths(genomixJob);
+		outputPath = FileOutputFormat.getOutputPath(genomixJob);
+
+		String type = conf.get(GenomixJob.GROUPBY_TYPE, "hybrid");
+		if (type.equalsIgnoreCase("external")) {
+			groupbyType = GroupbyType.EXTERNAL;
+		} else if (type.equalsIgnoreCase("precluster")) {
+			groupbyType = GroupbyType.PRECLUSTER;
+		} else {
+			groupbyType = GroupbyType.HYBRIDHASH;
+		}
+	}
+
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
new file mode 100644
index 0000000..d221834
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.genomix.job;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobGenContigsGeneration extends JobGen {
+
+	public JobGenContigsGeneration(GenomixJob job) {
+		super(job);
+	}
+
+	@Override
+	public JobSpecification generateJob() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	protected void initJobConfiguration() {
+		// TODO Auto-generated method stub
+		
+	}
+
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
new file mode 100644
index 0000000..6b7d98e
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.genomix.job;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobGenGraphCleanning extends JobGen {
+
+	public JobGenGraphCleanning(GenomixJob job) {
+		super(job);
+	}
+
+	@Override
+	public JobSpecification generateJob() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	protected void initJobConfiguration() {
+		// TODO Auto-generated method stub
+		
+	}
+
+}
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
new file mode 100644
index 0000000..5de25ba
--- /dev/null
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -0,0 +1,113 @@
+package edu.uci.ics.genomix.example.jobrun;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.driver.Driver;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+
+public class JobRunTestCase extends TestCase{
+    private static final String ACTUAL_RESULT_DIR = "actual";
+    private static final String EXPECTED_RESULT_PATH = "src/test/resources/expected";
+    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+
+    private static final String DATA_PATH = "src/test/resources/data/customer.tbl";
+    private static final String HDFS_INPUT_PATH = "/customer/";
+    private static final String HDFS_OUTPUT_PATH = "/customer_result/";
+
+    private static final String HYRACKS_APP_NAME = "genomix";
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+    private MiniDFSCluster dfsCluster;
+
+    private JobConf conf = new JobConf();
+    private int numberOfNC = 2;
+    private int numPartitionPerMachine=2;
+    
+    private Driver myDriver;
+	@Override
+	protected void setUp() throws Exception {
+        cleanupStores();
+        HyracksUtils.init();
+        HyracksUtils.createApp(HYRACKS_APP_NAME);
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        startHDFS();
+        
+        myDriver = new Driver(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+    }
+
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    private void startHDFS() throws IOException {
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        FileSystem dfs = FileSystem.get(conf);
+        Path src = new Path(DATA_PATH);
+        Path dest = new Path(HDFS_INPUT_PATH);
+        Path result = new Path(HDFS_OUTPUT_PATH);
+        dfs.mkdirs(dest);
+        dfs.mkdirs(result);
+        dfs.copyFromLocalFile(src, dest);
+
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+        conf.writeXml(confOutput);
+        confOutput.flush();
+        confOutput.close();
+    }
+
+    
+	@Override
+	protected void runTest() throws Throwable {
+		TestExternalGroupby();
+		TestPreClusterGroupby();
+		TestHybridGroupby();
+	}
+	
+	void TestExternalGroupby() throws Exception{
+		// TODO
+	}
+	
+	void TestPreClusterGroupby() throws Exception{
+		// TODO
+	}
+	
+	void TestHybridGroupby() throws Exception{
+		// TODO
+	}
+
+	
+	@Override
+	protected void tearDown() throws Exception {
+        HyracksUtils.destroyApp(HYRACKS_APP_NAME);
+        HyracksUtils.deinit();
+        cleanupHDFS();
+	}
+	
+    private void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+    }
+	
+}