merge with jianfeng-reverse
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 10821b0..ab0f6e9 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -119,8 +119,17 @@
int thisValue = Marshal.getInt(b1, s1);
int thatValue = Marshal.getInt(b2, s2);
int diff = thisValue - thatValue;
+ int src = 0;
+ int des = 0;
if (diff == 0) {
- return b1[s1 + INTBYTES] - b2[s2 + INTBYTES];
+ if(b1[s1 + INTBYTES] < 0) {
+ src = - b1[s1 + INTBYTES];}
+ if(b2[s2 + INTBYTES] < 0){
+ des = - b2[s2 + INTBYTES];}
+ if(src == des) {
+ return b1[s1 + INTBYTES] - b2[s2 + INTBYTES];
+ }
+ return src - des;
}
return diff;
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
index ccbabd7..013dd72 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -13,68 +13,71 @@
@SuppressWarnings("deprecation")
public class DeepGraphBuildingMapper extends MapReduceBase implements
Mapper<KmerBytesWritable, PositionListWritable, PositionWritable, PositionListAndKmerWritable> {
-
- public PositionWritable VertexID;
- public PositionWritable tempVertex;
- public PositionListWritable listPosZeroInRead;
- public PositionListWritable listPosNonZeroInRead;
- public PositionListWritable tempPosList;
- public PositionListAndKmerWritable outputListAndKmer;
+
+ private PositionWritable positionEntry;
+ private PositionWritable tempVertex;
+ private PositionListWritable listPosZeroInRead;
+ private PositionListWritable listPosNonZeroInRead;
+ private PositionListWritable tempPosList;
+ private PositionListAndKmerWritable outputListAndKmer;
+ private static int LAST_POSID;
+ private static int KMER_SIZE;
+ private static int READ_LENGTH;
+
@Override
public void configure(JobConf job) {
- VertexID = new PositionWritable();
+ KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+ READ_LENGTH = Integer.parseInt(job.get("readLength"));
+ positionEntry = new PositionWritable();
tempVertex = new PositionWritable();
listPosZeroInRead = new PositionListWritable();
listPosNonZeroInRead = new PositionListWritable();
tempPosList = new PositionListWritable();
outputListAndKmer = new PositionListAndKmerWritable();
+ LAST_POSID = READ_LENGTH - KMER_SIZE + 1;
}
+
+ private boolean isStart(byte posInRead) {
+ return posInRead == 1 || posInRead == -LAST_POSID;
+ }
+
@Override
- public void map(KmerBytesWritable key, PositionListWritable value, OutputCollector<PositionWritable, PositionListAndKmerWritable> output,
- Reporter reporter) throws IOException {
- if(key.toString().equals("AGAAG")) {
- int y = 4;
- int x = y;
- }
+ public void map(KmerBytesWritable key, PositionListWritable value,
+ OutputCollector<PositionWritable, PositionListAndKmerWritable> output, Reporter reporter)
+ throws IOException {
+ outputListAndKmer.reset();
+ int tupleCount = value.getCountOfPosition();
+ scanPosition(tupleCount, value);
+ scanAgainAndOutput(listPosZeroInRead, listPosNonZeroInRead, key, output);
+ scanAgainAndOutput(listPosNonZeroInRead, listPosZeroInRead, key, output);
+ }
+
+ public void scanPosition(int tupleCount, PositionListWritable value) {
listPosZeroInRead.reset();
listPosNonZeroInRead.reset();
- outputListAndKmer.reset();
- System.out.println(value.getLength());
- for(int i = 0; i < value.getCountOfPosition(); i++) {
- VertexID.set(value.getPosition(i));
- if(VertexID.getPosInRead() == 0) {
- listPosZeroInRead.append(VertexID);
- }
- else {
- listPosNonZeroInRead.append(VertexID);
+ for (int i = 0; i < tupleCount; i++) {
+ positionEntry.set(value.getPosition(i));
+ if (isStart(positionEntry.getPosInRead())) {
+ listPosZeroInRead.append(positionEntry);
+ } else {
+ listPosNonZeroInRead.append(positionEntry);
}
}
- for(int i = 0; i < listPosZeroInRead.getCountOfPosition(); i++) {
- VertexID.set(listPosZeroInRead.getPosition(i));
+ }
+
+ public void scanAgainAndOutput(PositionListWritable outputListInRead, PositionListWritable attriListInRead,
+ KmerBytesWritable kmer, OutputCollector<PositionWritable, PositionListAndKmerWritable> output) throws IOException {
+ for (int i = 0; i < outputListInRead.getCountOfPosition(); i++) {
+ positionEntry.set(outputListInRead.getPosition(i));
tempPosList.reset();
- for (int j = 0; j < listPosNonZeroInRead.getCountOfPosition(); j++) {
- tempVertex.set(listPosNonZeroInRead.getPosition(i));
- if(tempVertex.getReadID() != VertexID.getReadID()) {
- int tempReadID = tempVertex.getReadID();
- byte tempPosInRead = (byte) (tempVertex.getPosInRead() - 1);
- tempVertex.set(tempReadID, tempPosInRead);
+ for (int j = 0; j < attriListInRead.getCountOfPosition(); j++) {
+ tempVertex.set(attriListInRead.getPosition(i));
+ if (tempVertex.getReadID() != positionEntry.getReadID()) {
tempPosList.append(tempVertex);
}
}
- outputListAndKmer.set(tempPosList, key);
- output.collect(VertexID, outputListAndKmer);
- }
- for(int i = 0; i < listPosNonZeroInRead.getCountOfPosition(); i++) {
- VertexID.set(listPosNonZeroInRead.getPosition(i));
- tempPosList.reset();
- for (int j = 0; j < listPosZeroInRead.getCountOfPosition(); j++) {
- tempVertex.set(listPosNonZeroInRead.getPosition(i));
- if(tempVertex.getReadID() != VertexID.getReadID()) {
- tempPosList.append(tempVertex);
- }
- }
- outputListAndKmer.set(tempPosList, key);
- output.collect(VertexID, outputListAndKmer);
+ outputListAndKmer.set(tempPosList, kmer);
+ output.collect(positionEntry, outputListAndKmer);
}
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
index ee12661..ee2c4b5 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -44,96 +44,160 @@
}
}
- private PositionListAndKmerWritable nodeListAndKmer = new PositionListAndKmerWritable();
- private PositionListAndKmerWritable nodeSuccListAndKmer = new PositionListAndKmerWritable();
- private NodeWritable startNodeInRead = new NodeWritable();
- private NodeWritable nodeToMerge = new NodeWritable();
- private NodeWritable nodeToBeMerged = new NodeWritable();
+ private PositionListAndKmerWritable curNodePosiListAndKmer = new PositionListAndKmerWritable();
+ private PositionListAndKmerWritable curNodeNegaListAndKmer = new PositionListAndKmerWritable();
+ private PositionListAndKmerWritable nextNodePosiListAndKmer = new PositionListAndKmerWritable();
+ private PositionListAndKmerWritable nextNodeNegaListAndKmer = new PositionListAndKmerWritable();
+
+ private NodeWritable curNode = new NodeWritable();
+ private NodeWritable nextNode = new NodeWritable();
+ private NodeWritable nextNextNode = new NodeWritable();
private PositionListWritable incomingList = new PositionListWritable();
private PositionListWritable outgoingList = new PositionListWritable();
private NullWritable nullWritable = NullWritable.get();
- private nodeToMergeState state = new nodeToMergeState();
private int KMER_SIZE;
+ private int LAST_POSID;
+ private int READ_LENGTH;
+
public void configure(JobConf job) {
- KMER_SIZE = job.getInt("sizeKmer", 0);
+ KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+ READ_LENGTH = Integer.parseInt(job.get("readLength"));
+ LAST_POSID = READ_LENGTH - KMER_SIZE + 1;
}
+
@Override
public void reduce(PositionWritable key, Iterator<PositionListAndKmerWritable> values,
OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
int readID = key.getReadID();
byte posInRead = 0;
- assembleNodeInitialization(readID, posInRead, values);
- posInRead = (byte) (posInRead + 2);
- //----LOOP
- while (values.hasNext()) {
- assembleNodeFromValues(readID, posInRead, values);
- posInRead = (byte) (posInRead + 1);
- if (nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true) {
- nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
- state.setToNotUpdate();
+ assembleFirstTwoNodesInRead(readID, posInRead, curNode, nextNode, values);
+ assembleNodeFromValues(readID, posInRead, curNode, nextNode, values);
+ posInRead ++;
+ while(values.hasNext()){
+ assembleNodeFromValues(readID, posInRead, curNode, nextNextNode, values);
+
+ if (curNode.inDegree() > 1 || curNode.outDegree() > 0 || nextNode.inDegree() > 0
+ || nextNode.outDegree() > 0 || nextNextNode.inDegree() > 0
+ || nextNextNode.outDegree() > 0) {
+ connect(curNode, nextNextNode);
+ output.collect(curNode, nullWritable);
+ curNode.set(nextNode);
+ nextNode.set(nextNode);
+ continue;
+ }
+ curNode.mergeForwadNext(nextNode, KMER_SIZE);
+ nextNode.set(nextNextNode);
+ }
+ }
+
+ public void assembleNodeFromValues(int readID, byte posInRead, NodeWritable curNode, NodeWritable nextNode, Iterator<PositionListAndKmerWritable> values)
+ throws IOException {
+ curNodePosiListAndKmer.set(nextNodePosiListAndKmer);
+ curNodeNegaListAndKmer.set(nextNodeNegaListAndKmer);
+ if (values.hasNext()) {
+ nextNodePosiListAndKmer.set(values.next());
+ if(values.hasNext()) {
+ nextNodeNegaListAndKmer.set(values.next());
}
else {
- state.setToAssignedByRightNode();
- output.collect(nodeToMerge, nullWritable);
+ throw new IOException("lose the paired kmer");
+ }
+ }
+
+ outgoingList.reset();
+ outgoingList.set(nextNodePosiListAndKmer.getVertexIDList());
+ setForwardOutgoingList(curNode, outgoingList);
+
+ nextNode.setNodeID(readID, (byte)(posInRead + 1));
+ nextNode.setKmer(nextNodePosiListAndKmer.getKmer());
+
+ outgoingList.reset();
+ outgoingList.set(curNodeNegaListAndKmer.getVertexIDList());
+ setReverseOutgoingList(nextNode, outgoingList);
+
+ if (nextNode.getNodeID().getPosInRead() == LAST_POSID) {
+ incomingList.reset();
+ incomingList.set(nextNodeNegaListAndKmer.getVertexIDList());
+ setReverseIncomingList(nextNode, incomingList);
+ }
+ }
+
+ public void assembleFirstTwoNodesInRead(int readID, byte posInRead, NodeWritable curNode, NodeWritable nextNode, Iterator<PositionListAndKmerWritable> values)
+ throws IOException {
+ nextNodePosiListAndKmer.set(values.next());
+ if (values.hasNext()) {
+ nextNodeNegaListAndKmer.set(values.next());
+ } else {
+ throw new IOException("lose the paired kmer");
+ }
+
+ if (curNode.getNodeID().getPosInRead() == LAST_POSID) {
+ incomingList.reset();
+ incomingList.set(nextNodeNegaListAndKmer.getVertexIDList());
+ setReverseIncomingList(curNode, incomingList);
+ }
+ incomingList.reset();
+ incomingList.set(nextNodePosiListAndKmer.getVertexIDList());
+
+ curNode.setNodeID(readID, posInRead);
+ curNode.setKmer(curNodePosiListAndKmer.getKmer());
+ setForwardIncomingList(curNode, incomingList);
+ }
+
+ private void setForwardOutgoingList(NodeWritable node, PositionListWritable posList) {
+ for (PositionWritable pos : posList) {
+ if (pos.getPosInRead() > 0) {
+ node.getFFList().append(pos);
+ } else {
+ node.getFRList().append(pos.getReadID(), (byte) -pos.getPosInRead());
}
}
}
- public void assembleNodeFromValues(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
- throws IOException {
- if (values.hasNext()) {
- nodeListAndKmer.set(values.next());
- } else {
- throw new IOException("the size of values emerge bug!");
+ private void setForwardIncomingList(NodeWritable node, PositionListWritable posList) {
+ for (PositionWritable pos : posList) {
+ if (pos.getPosInRead() > 0) {
+ if (pos.getPosInRead() > 1) {
+ node.getRRList().append(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+ } else {
+ throw new IllegalArgumentException("position id is invalid");
+ }
+ } else {
+ if (pos.getPosInRead() > -LAST_POSID) {
+ node.getRFList().append(pos.getReadID(), (byte) -(pos.getPosInRead() - 1));
+ }
+ }
}
- if (state.getState().equals("ASSIGNED_BY_RIGHTNODE")) {
- nodeToMerge.set(nodeToBeMerged);
- }
- incomingList.reset();
- incomingList.append(readID, (byte) (posInRead - 1));
- if (nodeSuccListAndKmer.getVertexIDList() != null)
- outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
- outgoingList.append(readID, (byte) (posInRead + 1));
- nodeToBeMerged.setNodeID(readID, (byte) posInRead);
- nodeToBeMerged.setIncomingList(incomingList);
- nodeToBeMerged.setOutgoingList(outgoingList);
- nodeToBeMerged.setKmer(nodeListAndKmer.getKmer());
}
- public void assembleNodeInitialization(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
- throws IOException {
- if (values.hasNext()) {
- nodeListAndKmer.set(values.next());
- } else {
- throw new IOException("the size of values emerge bug!");
+ private void setReverseOutgoingList(NodeWritable node, PositionListWritable posList) {
+ for (PositionWritable pos : posList) {
+ if (pos.getPosInRead() > 0) {
+ node.getRFList().append(pos);
+ } else {
+ node.getRRList().append(pos.getReadID(), (byte) -pos.getPosInRead());
+ }
}
- if (values.hasNext()) {
- nodeSuccListAndKmer.set(values.next());
+ }
+
+ private void setReverseIncomingList(NodeWritable node, PositionListWritable posList) {
+ for (PositionWritable pos : posList) {
+ if (pos.getPosInRead() > 0) {
+ if (pos.getPosInRead() > 1) {
+ node.getFRList().append(pos.getReadID(), (byte) (pos.getPosInRead() - 1));
+ } else {
+ throw new IllegalArgumentException("Invalid position");
+ }
+ } else {
+ if (pos.getPosInRead() > -LAST_POSID) {
+ node.getFFList().append(pos.getReadID(), (byte) -(pos.getPosInRead() - 1));
+ }
+ }
}
- incomingList.reset();
- incomingList.set(nodeSuccListAndKmer.getVertexIDList());
- outgoingList.reset();
- if (nodeSuccListAndKmer.getVertexIDList() != null)
- outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
- outgoingList.append(readID, (byte) (posInRead + 1));
- startNodeInRead.setNodeID(readID, posInRead);
- startNodeInRead.setIncomingList(incomingList);
- startNodeInRead.setOutgoingList(outgoingList);
- startNodeInRead.setKmer(nodeListAndKmer.getKmer());
- //---------
- nodeListAndKmer.set(nodeSuccListAndKmer);
- incomingList.reset();
- incomingList.append(readID, posInRead);
- if (values.hasNext()) {
- nodeSuccListAndKmer.set(values.next());
- if (nodeSuccListAndKmer.getVertexIDList() != null)
- outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
- }
- outgoingList.append(readID, (byte) (posInRead + 2));
- nodeToMerge.setNodeID(readID, (byte) posInRead);
- nodeToMerge.setIncomingList(incomingList);
- nodeToMerge.setOutgoingList(outgoingList);
- nodeToMerge.setKmer(nodeListAndKmer.getKmer());
- state.setToNotUpdate();
+ }
+
+ private void connect(NodeWritable curNode, NodeWritable nextNode) {
+ curNode.getFFList().append(nextNode.getNodeID());
+ nextNode.getRRList().append(curNode.getNodeID());
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
index 9592a59..e78e921 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -39,13 +39,25 @@
readID = Integer.parseInt(rawLine[0]);
String geneLine = rawLine[1];
byte[] array = geneLine.getBytes();
+ if (KMER_SIZE >= array.length) {
+ throw new IOException("short read");
+ }
outputKmer.setByRead(array, 0);
- outputVertexID.set(readID, (byte) 0);
+ outputVertexID.set(readID, (byte) 1);
output.collect(outputKmer, outputVertexID);
/** middle kmer */
for (int i = KMER_SIZE; i < array.length; i++) {
outputKmer.shiftKmerWithNextChar(array[i]);
- outputVertexID.set(readID, (byte) (i - KMER_SIZE + 1));
+ outputVertexID.set(readID, (byte) (i - KMER_SIZE + 2));
+ output.collect(outputKmer, outputVertexID);
+ }
+ /** reverse first kmer */
+ outputKmer.setByReadReverse(array, 0);
+ outputVertexID.set(readID, (byte) -1);
+ /** reverse middle kmer */
+ for (int i = KMER_SIZE; i < array.length; i++) {
+ outputKmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
+ outputVertexID.set(readID, (byte) (-2 + KMER_SIZE - i));
output.collect(outputKmer, outputVertexID);
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index 98d21af..6c26feb 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -80,7 +80,7 @@
nextNodeEntry = new NodeReference(kmerSize);
nextNextNodeEntry = new NodeReference(0);
cachePositionList = new PositionListWritable();
- LAST_POSITION_ID = (inputRecDesc.getFieldCount() - InputInfoFieldStart) / 2;
+ LAST_POSITION_ID = (inputRecDesc.getFieldCount() - InputInfoFieldStart) / 2; //?????????正负
}
@Override