add GenomixMapper for new graph - 'contrail'
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
index 2880b6c..2f463a4 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
@@ -2,11 +2,25 @@
import java.io.IOException;
-import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
+import edu.uci.ics.genomix.hadoop.graphbuilding.GenomixCombiner;
+import edu.uci.ics.genomix.hadoop.graphbuilding.GenomixMapper;
+import edu.uci.ics.genomix.hadoop.graphbuilding.GenomixReducer;
+import edu.uci.ics.genomix.hadoop.oldtype.KmerBytesWritable;
+import edu.uci.ics.genomix.hadoop.oldtype.KmerCountValue;
+
+@SuppressWarnings("deprecation")
public class GenomixDriver {
private static class Options {
@@ -27,7 +41,29 @@
}
public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength) throws IOException{
-
+ JobConf conf = new JobConf(GenomixDriver.class);
+ conf.setInt("sizeKmer", sizeKmer);
+ conf.setInt("readLength", readLength);
+
+ conf.setJobName("Genomix Graph Building");
+ conf.setMapperClass(GenomixMapper.class);
+ conf.setReducerClass(GenomixReducer.class);
+ conf.setCombinerClass(GenomixCombiner.class);
+
+ conf.setMapOutputKeyClass(KmerBytesWritable.class);
+ conf.setMapOutputValueClass(KmerCountValue.class);
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputKeyClass(KmerBytesWritable.class);
+ conf.setOutputValueClass(KmerCountValue.class);
+ FileInputFormat.setInputPaths(conf, new Path(inputPath));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ conf.setNumReduceTasks(numReducers);
+
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(new Path(outputPath), true);
+ JobClient.runJob(conf);
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
new file mode 100644
index 0000000..b0f0488
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
@@ -0,0 +1,101 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class GenomixMapper extends MapReduceBase implements
+ Mapper<LongWritable, Text, KmerBytesWritable, NodeWritable>{
+
+ public static enum KmerDir{
+ FORWARD,
+ REVERSE,
+ }
+
+ public static int KMER_SIZE;
+ private KmerBytesWritable preForwardKmer;
+ private KmerBytesWritable preReverseKmer;
+ private KmerBytesWritable curForwardKmer;
+ private KmerBytesWritable curReverseKmer;
+ private KmerBytesWritable nextForwardKmer;
+ private KmerBytesWritable nextReverseKmer;
+ private NodeWritable outputNode;
+ private PositionWritable nodeId;
+ private KmerListWritable kmerList;
+
+ private KmerBytesWritableFactory kmerFactory;
+ private KmerDir preKmerDir;
+ private KmerDir curKmerDir;
+ private KmerDir nextKmerDir;
+
+ byte mateId = (byte)0;
+
+ @Override
+ public void configure(JobConf job) {
+ KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+ preForwardKmer = new KmerBytesWritable(KMER_SIZE);
+ preReverseKmer = new KmerBytesWritable(KMER_SIZE);
+ curForwardKmer = new KmerBytesWritable(KMER_SIZE);
+ curReverseKmer = new KmerBytesWritable(KMER_SIZE);
+ nextForwardKmer = new KmerBytesWritable(KMER_SIZE);
+ nextReverseKmer = new KmerBytesWritable(KMER_SIZE);
+ outputNode = new NodeWritable();
+ nodeId = new PositionWritable();
+ kmerList = new KmerListWritable();
+ kmerFactory = new KmerBytesWritableFactory(KMER_SIZE);
+ preKmerDir = KmerDir.FORWARD;
+ curKmerDir = KmerDir.FORWARD;
+ nextKmerDir = KmerDir.FORWARD;
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, OutputCollector<KmerBytesWritable, NodeWritable> output,
+ Reporter reporter) throws IOException {
+ /** first kmer */
+ String[] rawLine = value.toString().split("\\t"); // Read the Real Gene Line
+ if (rawLine.length != 2) {
+ throw new IOException("invalid data");
+ }
+ int readID = 0;
+ readID = Integer.parseInt(rawLine[0]);
+ String geneLine = rawLine[1];
+ Pattern genePattern = Pattern.compile("[AGCT]+");
+ Matcher geneMatcher = genePattern.matcher(geneLine);
+ boolean isValid = geneMatcher.matches();
+ if (isValid == true) {
+ byte[] array = geneLine.getBytes();
+ if (KMER_SIZE >= array.length) {
+ throw new IOException("short read");
+ }
+ curForwardKmer.setByRead(array, 0);
+ curReverseKmer.set(kmerFactory.reverse(curForwardKmer));
+ if(curForwardKmer.compareTo(curReverseKmer) >= 0)
+ curKmerDir = KmerDir.FORWARD;
+ else
+ curKmerDir = KmerDir.REVERSE;
+ nodeId.set(mateId, readID, 0);
+ setNextKmer(array[KMER_SIZE]);
+ }
+ }
+
+ public void setNextKmer(byte nextChar){
+ nextForwardKmer.set(curForwardKmer);
+ nextForwardKmer.shiftKmerWithNextChar(nextChar);
+ nextForwardKmer.set(kmerFactory.reverse(nextForwardKmer));
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index dd37a6a..1a793cd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -39,13 +39,6 @@
* initiate kmerSize, maxIteration
*/
public void initVertex() {
-// if (kmerSize == -1)
-// kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
-// if (maxIteration < 0)
-// maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
-// outFlag = (byte)0;
-// outgoingMsg.reset();
-// headFlag = (byte)(getVertexValue().getState() & State.IS_HEAD);
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 59104f7..05a4700 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -102,8 +102,6 @@
/**
* set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
*/
-
-
protected boolean setNextInfo(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0) {
nextID.set(value.getFFList().getPosition(0));
@@ -121,7 +119,6 @@
/**
* set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
*/
-
protected boolean setPrevInfo(VertexValueWritable value) {
if (value.getRRList().getCountOfPosition() > 0) {
prevID.set(value.getRRList().getPosition(0));
@@ -169,8 +166,7 @@
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
sendUpdateMsgToSuccessor();
- } //else
- //voteToHalt();
+ }
}
}else {
// I'm a tail
@@ -179,24 +175,20 @@
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
sendUpdateMsgToPredecessor();
- } //else
- //voteToHalt();
+ }
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
sendUpdateMsgToPredecessor();
- } //else
- //voteToHalt();
+ }
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
sendUpdateMsgToSuccessor();
- } //else
- //voteToHalt();
- } //else
- //voteToHalt();
+ }
+ }
}
}
else if (getSuperstep() % 4 == 0){