Merge branch 'jianfeng/genomix' of https://code.google.com/p/hyracks into jianfeng/genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index df4e755..c0b00a7 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -43,14 +43,15 @@
incomingList.set(incoming);
}
- public void setKmer(KmerBytesWritable kmer) {
- this.kmer = kmer;
- }
public void setOutgoingList(PositionListWritable outgoing) {
outgoingList.set(outgoing);
}
+ public void setKmer(KmerBytesWritable right) {
+ this.kmer.set(right);
+ }
+
public void reset(int kmerSize) {
nodeID.set(0, (byte) 0);
incomingList.reset();
@@ -131,5 +132,12 @@
sbuilder.append(kmer.toString()).append(')');
return sbuilder.toString();
}
+
+ /*
+ * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1
+ */
+ public boolean isPathNode() {
+ return incomingList.getCountOfPosition() == 1 && outgoingList.getCountOfPosition() == 1;
+ }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 40817e8..11d0998 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -19,7 +19,8 @@
protected int offset;
protected int valueCount;
protected static final byte[] EMPTY = {};
-
+ public static final int INTBYTES = 4;
+
protected PositionWritable posIter = new PositionWritable();
public PositionListWritable() {
@@ -67,6 +68,14 @@
return posIter;
}
+ public void resetPosition(int i, int readID, byte posInRead) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ Marshal.putInt(readID, storage, offset + i * PositionWritable.LENGTH);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
@Override
public Iterator<PositionWritable> iterator() {
Iterator<PositionWritable> it = new Iterator<PositionWritable>() {
@@ -120,7 +129,7 @@
storage[offset + valueCount * PositionWritable.LENGTH + PositionWritable.INTBYTES] = posInRead;
valueCount += 1;
}
-
+
public static int getCountByDataLength(int length) {
if (length % PositionWritable.LENGTH != 0) {
throw new IllegalArgumentException("Length of positionlist is invalid");
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 132b464..f77c844 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -126,6 +126,19 @@
}
}
+ public static class FirstComparator extends WritableComparator {
+ public FirstComparator() {
+ super(PositionWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int thisValue = Marshal.getInt(b1, s1);
+ int thatValue = Marshal.getInt(b2, s2);
+ int diff = thisValue - thatValue;
+ return diff;
+ }
+ }
+
static { // register this comparator
WritableComparator.define(PositionWritable.class, new Comparator());
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
new file mode 100644
index 0000000..ddb2a64
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -0,0 +1,246 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH3 extends Configured implements Tool {
+ /*
+ * Flags used when sending messages
+ */
+ public static class MessageFlag {
+ public static final byte EMPTY_MESSAGE = 0;
+ public static final byte FROM_SELF = 1;
+ public static final byte FROM_SUCCESSOR = 1 << 1;
+ public static final byte IS_HEAD = 1 << 2;
+ public static final byte FROM_PREDECESSOR = 1 << 3;
+
+ public static String getFlagAsString(byte code) {
+ // TODO: allow multiple flags to be set
+ switch (code) {
+ case EMPTY_MESSAGE:
+ return "EMPTY_MESSAGE";
+ case FROM_SELF:
+ return "FROM_SELF";
+ case FROM_SUCCESSOR:
+ return "FROM_SUCCESSOR";
+ }
+ return "ERROR_BAD_MESSAGE";
+ }
+ }
+
+ /*
+ * Common functionality for the two mapper types needed. See javadoc for MergePathsH3MapperSubsequent.
+ */
+ private static class MergePathsH3MapperBase extends MapReduceBase {
+
+ protected static long randSeed;
+ protected Random randGenerator;
+ protected float probBeingRandomHead;
+
+ protected int KMER_SIZE;
+ protected PositionWritable outputKey;
+ protected MessageWritableNodeWithFlag outputValue;
+ protected NodeWritable curNode;
+
+ public void configure(JobConf conf) {
+ randSeed = conf.getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ outputKey = new PositionWritable();
+ curNode = new NodeWritable(KMER_SIZE);
+ }
+
+ protected boolean isNodeRandomHead(PositionWritable nodeID) {
+ // "deterministically random", based on node id
+ randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ return randGenerator.nextFloat() < probBeingRandomHead;
+ }
+ }
+
+ /*
+ * Mapper class: Partition the graph using random pseudoheads.
+ * Heads send themselves to their successors, and all others map themselves.
+ */
+ private static class MergePathsH3MapperSubsequent extends MergePathsH3MapperBase implements
+ Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+ @Override
+ public void map(PositionWritable key, MessageWritableNodeWithFlag value,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+ curNode = value.getNode();
+ // Map all path vertices; tail nodes are sent to their predecessors
+ if (curNode.isPathNode()) {
+ boolean isHead = (value.getFlag() & MessageFlag.IS_HEAD) == MessageFlag.IS_HEAD;
+ if (isHead || isNodeRandomHead(curNode.getNodeID())) {
+ // head nodes send themselves to their successor
+ outputKey.set(curNode.getOutgoingList().getPosition(0));
+ outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
+ output.collect(outputKey, outputValue);
+ } else {
+ // tail nodes map themselves
+ outputValue.set(MessageFlag.FROM_SELF, curNode);
+ output.collect(key, outputValue);
+ }
+ }
+ }
+ }
+
+ /*
+ * Mapper used for the first iteration. See javadoc for MergePathsH3MapperSubsequent.
+ */
+ private static class MergePathsH3MapperInitial extends MergePathsH3MapperBase implements
+ Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
+ @Override
+ public void map(NodeWritable key, NullWritable value,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+ curNode = key;
+ // Map all path vertices; tail nodes are sent to their predecessors
+ if (curNode.isPathNode()) {
+ if (isNodeRandomHead(curNode.getNodeID())) {
+ // head nodes send themselves to their successor
+ outputKey.set(curNode.getOutgoingList().getPosition(0));
+ outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
+ output.collect(outputKey, outputValue);
+ } else {
+ // tail nodes map themselves
+ outputValue.set(MessageFlag.FROM_SELF, curNode);
+ output.collect(key.getNodeID(), outputValue);
+ }
+ }
+ }
+ }
+
+ /*
+ * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes
+ */
+ private static class MergePathsH3Reducer extends MapReduceBase implements
+ Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+
+ private int KMER_SIZE;
+ private MessageWritableNodeWithFlag inputValue;
+ private MessageWritableNodeWithFlag outputValue;
+ private NodeWritable headNode;
+ private NodeWritable tailNode;
+ private int count;
+
+ public void configure(JobConf conf) {
+ KMER_SIZE = conf.getInt("sizeKmer", 0);
+ outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+ headNode = new NodeWritable(KMER_SIZE);
+ tailNode = new NodeWritable(KMER_SIZE);
+ }
+
+ @Override
+ public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
+ OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+ throws IOException {
+
+ inputValue = values.next();
+ if (!values.hasNext()) {
+ // all single nodes must be remapped
+ if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+ // FROM_SELF => remap self
+ output.collect(key, inputValue);
+ } else {
+ // FROM_PREDECESSOR => remap predecessor
+ output.collect(inputValue.getNode().getNodeID(), inputValue);
+ }
+ } else {
+ // multiple inputs => a merge will take place. Aggregate both, then collect the merged path
+ count = 0;
+ while (true) { // process values; break when no more
+ count++;
+ if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+ headNode.set(inputValue.getNode());
+ } else {
+ tailNode.set(inputValue.getNode());
+ }
+ if (!values.hasNext()) {
+ break;
+ } else {
+ inputValue = values.next();
+ }
+ }
+ if (count != 2) {
+ throw new IOException("Expected two nodes in MergePathsH3 reduce; saw " + String.valueOf(count));
+ }
+ // merge the head and tail as saved output, this merged node is now a head
+ headNode.mergeNext(tailNode, KMER_SIZE);
+ outputValue.set(MessageFlag.IS_HEAD, headNode);
+ output.collect(key, outputValue);
+ }
+ }
+ }
+
+ /*
+ * Run one iteration of the mergePaths algorithm
+ */
+ public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+ JobConf conf = new JobConf(baseConf);
+ conf.setJarByClass(MergePathsH3.class);
+ conf.setJobName("MergePathsH3 " + inputPath);
+
+ FileInputFormat.addInputPath(conf, new Path(inputPath));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ conf.setMapOutputKeyClass(PositionWritable.class);
+ conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+ conf.setOutputKeyClass(PositionWritable.class);
+ conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+
+ // on the first iteration, we have to transform from a node-oriented graph
+ // to a Position-oriented graph
+ if (conf.getInt("iMerge", 1) == 1) {
+ conf.setMapperClass(MergePathsH3MapperInitial.class);
+ } else {
+ conf.setMapperClass(MergePathsH3MapperSubsequent.class);
+ }
+ conf.setReducerClass(MergePathsH3Reducer.class);
+
+ FileSystem.get(conf).delete(new Path(outputPath), true);
+
+ return JobClient.runJob(conf);
+ }
+
+ @Override
+ public int run(String[] arg0) throws Exception {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new MergePathsH3(), args);
+ System.exit(res);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
new file mode 100644
index 0000000..4982439
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH3Driver {
+
+ private static class Options {
+ @Option(name = "-inputpath", usage = "the input path", required = true)
+ public String inputPath;
+
+ @Option(name = "-outputpath", usage = "the output path", required = true)
+ public String outputPath;
+
+ @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
+ public String mergeResultPath;
+
+ @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+ public int numReducers;
+
+ @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+ public int sizeKmer;
+
+ @Option(name = "-merge-rounds", usage = "the while rounds of merging", required = true)
+ public int mergeRound;
+
+ }
+
+ public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound)
+ throws IOException {
+ JobConf baseConf = new JobConf(); // I don't know the semantics here. do i use a base conf file or something?
+ baseConf.setNumReduceTasks(numReducers);
+ baseConf.setInt("sizeKmer", sizeKmer);
+
+ FileSystem dfs = FileSystem.get(baseConf);
+ String prevOutput = inputPath;
+ dfs.delete(new Path(outputPath), true); // clear any previous output
+
+ String tmpOutputPath = "NO_JOBS_DONE";
+ for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
+ baseConf.setInt("iMerge", iMerge);
+ MergePathsH3 merger = new MergePathsH3();
+ tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
+ merger.run(prevOutput, tmpOutputPath, baseConf);
+ }
+ dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ MergePathsH3Driver driver = new MergePathsH3Driver();
+ driver.run(options.inputPath, options.outputPath, options.numReducers,
+ options.sizeKmer, options.mergeRound);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
new file mode 100644
index 0000000..5a3076c
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.NodeWritable;
+
+public class MessageWritableNodeWithFlag extends BinaryComparable implements WritableComparable<BinaryComparable> {
+ private byte flag;
+ private NodeWritable node;
+
+ public MessageWritableNodeWithFlag(int k) {
+ this.flag = 0;
+ this.node = new NodeWritable(k);
+ }
+
+ public MessageWritableNodeWithFlag(byte flag, int kmerSize) {
+ this.flag = flag;
+ this.node = new NodeWritable(kmerSize);
+ }
+
+ public void set(MessageWritableNodeWithFlag right) {
+ set(right.getFlag(), right.getNode());
+ }
+
+ public void set(byte flag, NodeWritable node) {
+ this.node.set(node);
+ this.flag = flag;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ node.readFields(arg0);
+ flag = arg0.readByte();
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ node.write(arg0);
+ arg0.writeByte(flag);
+ }
+
+ public NodeWritable getNode() {
+ if (node.getCount() != 0) {
+ return node;
+ }
+ return null;
+ }
+
+ public byte getFlag() {
+ return this.flag;
+ }
+
+ public String toString() {
+ return node.toString() + '\t' + String.valueOf(flag);
+ }
+
+ @Override
+ public byte[] getBytes() {
+ if (node.getCount() != 0) {
+ return node.getKmer().getBytes();
+ } else
+ return null;
+ }
+
+ @Override
+ public int getLength() {
+ return node.getCount();
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
deleted file mode 100644
index 591e3c7..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
-
-import java.io.IOException;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
-
-@SuppressWarnings("deprecation")
-public class DeepGraphBuildingMapper extends MapReduceBase implements
- Mapper<KmerBytesWritable, PositionListWritable, IntWritable, LineBasedmappingWritable> {
- IntWritable numLine = new IntWritable();
- LineBasedmappingWritable lineBasedWriter = new LineBasedmappingWritable();
- @Override
- public void map(KmerBytesWritable key, PositionListWritable value, OutputCollector<IntWritable, LineBasedmappingWritable> output,
- Reporter reporter) throws IOException {
- }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
deleted file mode 100644
index 4b971a7..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
-
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-@SuppressWarnings("deprecation")
-public class DeepGraphBuildingReducer extends MapReduceBase implements
- Reducer<IntWritable, LineBasedmappingWritable, NodeWritable, NullWritable> {
-
-/* public ArrayList<LineBasedmappingWritable> lineElementsSet = new ArrayList<LineBasedmappingWritable>();
- public Position outputVerID = new Position();
- public VertexAdjacentWritable outputAdjacentList = new VertexAdjacentWritable();
- public PositionList srcVtexAdjList = new PositionList();
- public PositionList desVtexAdjList = new PositionList();
- public VertexIDListWritable srcAdjListWritable = new VertexIDListWritable();
- public VKmerBytesWritable desKmer = new VKmerBytesWritable(1);
- public VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- public VKmerBytesWritable srcKmer = new VKmerBytesWritable(1);*/
- @Override
- public void reduce(IntWritable key, Iterator<LineBasedmappingWritable> values,
- OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
-/* while (values.hasNext()) {
- lineElementsSet.add(values.next());
- }
- int[] orderLineTable = new int[lineElementsSet.size()];
- for (int i = 0; i < lineElementsSet.size(); i++) {
- int posInInvertedIndex = lineElementsSet.get(i).getPosInInvertedIndex();
- orderLineTable[lineElementsSet.get(i).getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex)] = i;
- }
- //the first node in this read
- int posInInvertedIndex = lineElementsSet.get(orderLineTable[0]).getPosInInvertedIndex();
- outputVerID.set(
- lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
- (byte) 0);
- desVtexAdjList.set(lineElementsSet.get(orderLineTable[1]).getAdjVertexList().get());
- for (int i = 0; i < desVtexAdjList.getUsedSize(); i++) {
- if (desVtexAdjList.getPosinReadListElement(i) == (byte) 0) {
- srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
- }
- }
- srcVtexAdjList.addELementToList(key.get(), (byte) 1);
- outputVerID.set(
- lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
- (byte) 0);
- srcAdjListWritable.set(srcVtexAdjList);
- outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[0]).getVkmer());
- output.collect(outputVerID, outputAdjacentList);
- //srcVtexAdjList reset!!!!
-
- for (int i = 1; i < lineElementsSet.size(); i++) {
- desVtexAdjList.set(lineElementsSet.get(orderLineTable[i + 1]).getAdjVertexList().get());
- boolean flag = false;
- for (int j = 0; j < desVtexAdjList.getUsedSize(); j++) {
- if (desVtexAdjList.getPosinReadListElement(j) == (byte) 0) {
- srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
- flag = true;
- }
- }
- if (flag = true) {
- //doesm't merge
- srcVtexAdjList.addELementToList(key.get(), (byte) (i + 1));
- outputVerID.set(
- lineElementsSet.get(orderLineTable[i]).getAdjVertexList().get()
- .getReadListElement(posInInvertedIndex), lineElementsSet.get(orderLineTable[i])
- .getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex));
- srcAdjListWritable.set(srcVtexAdjList);
- outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[i]).getVkmer());
- }
- else {
- //merge
- desKmer.set(kmerFactory.getFirstKmerFromChain(1, lineElementsSet.get(orderLineTable[i+1]).getVkmer()));
- srcKmer.set(lineElementsSet.get(orderLineTable[i]).getVkmer());
- lineElementsSet.get(orderLineTable[i+1]).getVkmer().set(kmerFactory.mergeTwoKmer(srcKmer, desKmer));
- orderLineTable[i+1] = orderLineTable[i];
- }
- }*/
- }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
deleted file mode 100644
index 1e44903..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import edu.uci.ics.genomix.type.PositionListWritable;
-
-public class LineBasedmappingWritable extends PositionListWritable{
- byte posInRead;
-
- public LineBasedmappingWritable() {
- super();
- this.posInRead = -1;
- }
-
- public LineBasedmappingWritable(int count, byte [] data, int offset, byte posInRead) {
- super(count, data, offset);
- this.posInRead = posInRead;
- }
-
- public void set(byte posInRead, PositionListWritable right) {
- super.set(right);
- this.posInRead = posInRead;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- this.posInRead = in.readByte();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeByte(this.posInRead);
- }
-
- public int getPosInInvertedIndex() {
- return this.posInRead;
- }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
new file mode 100644
index 0000000..c3c252a
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingMapper extends MapReduceBase implements
+ Mapper<KmerBytesWritable, PositionListWritable, PositionWritable, PositionListAndKmerWritable> {
+
+ public PositionWritable VertexID;
+ public PositionWritable tempVertex;
+ public PositionListWritable listPosZeroInRead;
+ public PositionListWritable listPosNonZeroInRead;
+ public PositionListWritable tempPosList;
+ public PositionListAndKmerWritable outputListAndKmer;
+ @Override
+ public void configure(JobConf job) {
+ VertexID = new PositionWritable();
+ tempVertex = new PositionWritable();
+ listPosZeroInRead = new PositionListWritable();
+ listPosNonZeroInRead = new PositionListWritable();
+ tempPosList = new PositionListWritable();
+ outputListAndKmer = new PositionListAndKmerWritable();
+ }
+ @Override
+ public void map(KmerBytesWritable key, PositionListWritable value, OutputCollector<PositionWritable, PositionListAndKmerWritable> output,
+ Reporter reporter) throws IOException {
+ listPosZeroInRead.reset();
+ listPosNonZeroInRead.reset();
+ outputListAndKmer.reset();
+ for(int i = 0; i < value.getLength(); i++) {
+ VertexID.set(value.getPosition(i));
+ if(VertexID.getPosInRead() == 0) {
+ listPosZeroInRead.append(VertexID);
+ }
+ else {
+ listPosNonZeroInRead.append(VertexID);
+ }
+ }
+ for(int i = 0; i < listPosZeroInRead.getCountOfPosition(); i++) {
+ VertexID.set(listPosZeroInRead.getPosition(i));
+ tempPosList.reset();
+ for (int j = 0; j < listPosNonZeroInRead.getCountOfPosition(); j++) {
+ tempVertex.set(listPosNonZeroInRead.getPosition(i));
+ if(tempVertex.getReadID() != VertexID.getReadID()) {
+ int tempReadID = tempVertex.getReadID();
+ byte tempPosInRead = (byte) (tempVertex.getPosInRead() - 1);
+ tempVertex.set(tempReadID, tempPosInRead);
+ tempPosList.append(tempVertex);
+ }
+ }
+ outputListAndKmer.set(tempPosList, key);
+ output.collect(VertexID, outputListAndKmer);
+ }
+ for(int i = 0; i < listPosNonZeroInRead.getCountOfPosition(); i++) {
+ VertexID.set(listPosNonZeroInRead.getPosition(i));
+ tempPosList.reset();
+ for (int j = 0; j < listPosZeroInRead.getCountOfPosition(); j++) {
+ tempVertex.set(listPosNonZeroInRead.getPosition(i));
+ if(tempVertex.getReadID() != VertexID.getReadID()) {
+ tempPosList.append(tempVertex);
+ }
+ }
+ outputListAndKmer.set(tempPosList, key);
+ output.collect(VertexID, outputListAndKmer);
+ }
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
new file mode 100644
index 0000000..ee12661
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -0,0 +1,139 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingReducer extends MapReduceBase implements
+ Reducer<PositionWritable, PositionListAndKmerWritable, NodeWritable, NullWritable> {
+ public class nodeToMergeState {
+ public static final byte NOT_UPDATE = 0;
+ public static final byte ASSIGNED_BY_RIGHTNODE = 1;
+ private byte state;
+
+ public nodeToMergeState() {
+ state = NOT_UPDATE;
+ }
+
+ public void setToNotUpdate() {
+ state = NOT_UPDATE;
+ }
+
+ public void setToAssignedByRightNode() {
+ state = ASSIGNED_BY_RIGHTNODE;
+ }
+
+ public String getState() {
+ switch (state) {
+ case NOT_UPDATE:
+ return "NOT_UPDATE";
+ case ASSIGNED_BY_RIGHTNODE:
+ return "ASSIGNED_BY_RIGHTNODE";
+ }
+ return "ERROR_STATE";
+ }
+ }
+
+ private PositionListAndKmerWritable nodeListAndKmer = new PositionListAndKmerWritable();
+ private PositionListAndKmerWritable nodeSuccListAndKmer = new PositionListAndKmerWritable();
+ private NodeWritable startNodeInRead = new NodeWritable();
+ private NodeWritable nodeToMerge = new NodeWritable();
+ private NodeWritable nodeToBeMerged = new NodeWritable();
+ private PositionListWritable incomingList = new PositionListWritable();
+ private PositionListWritable outgoingList = new PositionListWritable();
+ private NullWritable nullWritable = NullWritable.get();
+ private nodeToMergeState state = new nodeToMergeState();
+ private int KMER_SIZE;
+ public void configure(JobConf job) {
+ KMER_SIZE = job.getInt("sizeKmer", 0);
+ }
+ @Override
+ public void reduce(PositionWritable key, Iterator<PositionListAndKmerWritable> values,
+ OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
+ int readID = key.getReadID();
+ byte posInRead = 0;
+ assembleNodeInitialization(readID, posInRead, values);
+ posInRead = (byte) (posInRead + 2);
+ //----LOOP
+ while (values.hasNext()) {
+ assembleNodeFromValues(readID, posInRead, values);
+ posInRead = (byte) (posInRead + 1);
+ if (nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true) {
+ nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
+ state.setToNotUpdate();
+ }
+ else {
+ state.setToAssignedByRightNode();
+ output.collect(nodeToMerge, nullWritable);
+ }
+ }
+ }
+
+ public void assembleNodeFromValues(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+ throws IOException {
+ if (values.hasNext()) {
+ nodeListAndKmer.set(values.next());
+ } else {
+ throw new IOException("the size of values emerge bug!");
+ }
+ if (state.getState().equals("ASSIGNED_BY_RIGHTNODE")) {
+ nodeToMerge.set(nodeToBeMerged);
+ }
+ incomingList.reset();
+ incomingList.append(readID, (byte) (posInRead - 1));
+ if (nodeSuccListAndKmer.getVertexIDList() != null)
+ outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+ outgoingList.append(readID, (byte) (posInRead + 1));
+ nodeToBeMerged.setNodeID(readID, (byte) posInRead);
+ nodeToBeMerged.setIncomingList(incomingList);
+ nodeToBeMerged.setOutgoingList(outgoingList);
+ nodeToBeMerged.setKmer(nodeListAndKmer.getKmer());
+ }
+
+ public void assembleNodeInitialization(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+ throws IOException {
+ if (values.hasNext()) {
+ nodeListAndKmer.set(values.next());
+ } else {
+ throw new IOException("the size of values emerge bug!");
+ }
+ if (values.hasNext()) {
+ nodeSuccListAndKmer.set(values.next());
+ }
+ incomingList.reset();
+ incomingList.set(nodeSuccListAndKmer.getVertexIDList());
+ outgoingList.reset();
+ if (nodeSuccListAndKmer.getVertexIDList() != null)
+ outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+ outgoingList.append(readID, (byte) (posInRead + 1));
+ startNodeInRead.setNodeID(readID, posInRead);
+ startNodeInRead.setIncomingList(incomingList);
+ startNodeInRead.setOutgoingList(outgoingList);
+ startNodeInRead.setKmer(nodeListAndKmer.getKmer());
+ //---------
+ nodeListAndKmer.set(nodeSuccListAndKmer);
+ incomingList.reset();
+ incomingList.append(readID, posInRead);
+ if (values.hasNext()) {
+ nodeSuccListAndKmer.set(values.next());
+ if (nodeSuccListAndKmer.getVertexIDList() != null)
+ outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+ }
+ outgoingList.append(readID, (byte) (posInRead + 2));
+ nodeToMerge.setNodeID(readID, (byte) posInRead);
+ nodeToMerge.setIncomingList(incomingList);
+ nodeToMerge.setOutgoingList(outgoingList);
+ nodeToMerge.setKmer(nodeListAndKmer.getKmer());
+ state.setToNotUpdate();
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
new file mode 100644
index 0000000..acbc3f1
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
@@ -0,0 +1,112 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class GraphBuildingDriver {
+
+ private static class Options {
+ @Option(name = "-inputpath", usage = "the input path", required = true)
+ public String inputPath;
+
+ @Option(name = "-outputpath", usage = "the output path", required = true)
+ public String outputPath;
+
+ @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+ public int numReducers;
+
+ @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+ public int sizeKmer;
+
+ }
+
+ public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, String defaultConfPath)
+ throws IOException {
+
+ JobConf conf = new JobConf(GraphBuildingDriver.class);
+ conf.setInt("sizeKmer", sizeKmer);
+ if (defaultConfPath != null) {
+ conf.addResource(new Path(defaultConfPath));
+ }
+
+ conf.setJobName("graph building");
+ conf.setMapperClass(GraphInvertedIndexBuildingMapper.class);
+ conf.setReducerClass(GraphInvertedIndexBuildingReducer.class);
+
+ conf.setMapOutputKeyClass(KmerBytesWritable.class);
+ conf.setMapOutputValueClass(PositionWritable.class);
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ conf.setOutputKeyClass(KmerBytesWritable.class);
+ conf.setOutputValueClass(PositionListWritable.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(inputPath));
+ FileOutputFormat.setOutputPath(conf, new Path(inputPath + "-step1"));
+ conf.setNumReduceTasks(numReducers);
+
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(new Path(inputPath + "-step1"), true);
+ JobClient.runJob(conf);
+
+ //-------------
+ conf = new JobConf(GraphBuildingDriver.class);
+ if (defaultConfPath != null) {
+ conf.addResource(new Path(defaultConfPath));
+ }
+ conf.setJobName("deep build");
+
+ conf.setMapperClass(DeepGraphBuildingMapper.class);
+ conf.setReducerClass(DeepGraphBuildingReducer.class);
+
+ conf.setMapOutputKeyClass(PositionWritable.class);
+ conf.setMapOutputValueClass(PositionListAndKmerWritable.class);
+
+ conf.setPartitionerClass(ReadIDPartitioner.class);
+
+ conf.setOutputKeyComparatorClass(PositionWritable.Comparator.class);
+ conf.setOutputValueGroupingComparator(PositionWritable.FirstComparator.class);
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+
+ conf.setOutputKeyClass(NodeWritable.class);
+ conf.setOutputValueClass(NullWritable.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(inputPath + "-step1"));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ conf.setNumReduceTasks(1);
+ dfs.delete(new Path(outputPath), true);
+ JobClient.runJob(conf);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ GraphBuildingDriver driver = new GraphBuildingDriver();
+ driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, null);
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
similarity index 83%
rename from genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
rename to genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
index 7b65158..d5924c8 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
@@ -33,17 +33,16 @@
/** first kmer */
byte[] array = geneLine.getBytes();
outputKmer.setByRead(array, 0);
+ System.out.println(key.get());
outputVertexID.set((int)key.get(), (byte)0);
output.collect(outputKmer, outputVertexID);
/** middle kmer */
- for (int i = KMER_SIZE; i < array.length - 1; i++) {
+ int i = 0;
+ for (i = KMER_SIZE; i < array.length; i++) {
outputKmer.shiftKmerWithNextChar(array[i]);
+ System.out.println((int)key.get());
outputVertexID.set((int)key.get(), (byte)(i - KMER_SIZE + 1));
output.collect(outputKmer, outputVertexID);
}
- /** last kmer */
- outputKmer.shiftKmerWithNextChar(array[array.length - 1]);
- outputVertexID.set((int)key.get(), (byte)(array.length - 1 + 1));
- output.collect(outputKmer, outputVertexID);
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
similarity index 92%
rename from genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
rename to genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
index 72827f2..d2b6476 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
import java.io.IOException;
import java.util.Iterator;
@@ -18,6 +18,7 @@
@Override
public void reduce(KmerBytesWritable key, Iterator<PositionWritable> values,
OutputCollector<KmerBytesWritable, PositionListWritable> output, Reporter reporter) throws IOException {
+ outputlist.reset();
while (values.hasNext()) {
outputlist.append(values.next());
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/PositionListAndKmerWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/PositionListAndKmerWritable.java
new file mode 100644
index 0000000..fff3faf
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/PositionListAndKmerWritable.java
@@ -0,0 +1,81 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class PositionListAndKmerWritable implements WritableComparable<PositionListAndKmerWritable> {
+
+ private PositionListWritable vertexIDList;
+ private KmerBytesWritable kmer;
+ private int countOfKmer;
+
+ public PositionListAndKmerWritable(){
+ countOfKmer = 0;
+ vertexIDList = new PositionListWritable();
+ kmer = new KmerBytesWritable();
+ }
+
+ public PositionListAndKmerWritable(int kmerSize) {
+ countOfKmer = 0;
+ vertexIDList = new PositionListWritable();
+ kmer = new KmerBytesWritable(kmerSize);
+ }
+
+ public int getCount() {
+ return countOfKmer;
+ }
+
+ public void setCount(int count) {
+ this.countOfKmer = count;
+ }
+
+ public void setvertexIDList(PositionListWritable posList) {
+ vertexIDList.set(posList);
+ }
+
+ public void reset() {
+ vertexIDList.reset();
+ countOfKmer = 0;
+ }
+
+ public PositionListWritable getVertexIDList() {
+ return vertexIDList;
+ }
+
+ public KmerBytesWritable getKmer() {
+ return kmer;
+ }
+
+ public void set(PositionListAndKmerWritable right) {
+ this.countOfKmer = right.countOfKmer;
+ this.vertexIDList.set(right.vertexIDList);
+ this.kmer.set(right.kmer);
+ }
+
+ public void set(PositionListWritable list, KmerBytesWritable kmer) {
+ this.vertexIDList.set(list);
+ this.kmer.set(kmer);
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.countOfKmer = in.readInt();
+ this.vertexIDList.readFields(in);
+ this.kmer.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.countOfKmer);
+ this.vertexIDList.write(out);
+ this.kmer.write(out);
+ }
+
+ @Override
+ public int compareTo(PositionListAndKmerWritable o) {
+ return 0;
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java
new file mode 100644
index 0000000..362ebf3
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+//import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class ReadIDPartitioner implements Partitioner<PositionWritable, PositionListWritable>{
+
+ @Override
+ public int getPartition(PositionWritable key, PositionListWritable value, int numPartitions){
+ return (key.getReadID() & Integer.MAX_VALUE) % numPartitions;
+ }
+
+ @Override
+ public void configure(JobConf arg0) {
+ }
+}