complete the dataflow of hyracks
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
index b1b22ac..7280dff 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
@@ -26,13 +26,11 @@
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.IntermediateNodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.genomix.type.ReadIDWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 3650553..1b123f1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -24,9 +24,10 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -44,23 +45,32 @@
private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
public static final int OutputKmerField = 0;
- public static final int OutputNodeIdField = 1;
+ public static final int outputNodeIdListField = 1;
public static final int OutputForwardForwardField = 2;
- public static final int OutputForwardReverseField = 3;
- public static final int OutputReverseForwardField = 4;
- public static final int OutputReverseReverseField = 5;
+ public static final int OutputFFListCountField = 3;
+ public static final int OutputForwardReverseField = 4;
+ public static final int OutputFRListCountField = 5;
+ public static final int OutputReverseForwardField = 6;
+ public static final int OutputRFListCountField = 7;
+ public static final int OutputReverseReverseField = 8;
+ public static final int OutputRRListCountField = 9;
private final int readLength;
private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null });
+ null, null, null, null, null, null, null, null});
- public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
+ public ReadsKeyValueParserFactory(int readlength, int k) {
this.readLength = readlength;
this.kmerSize = k;
}
-
+
+ public static enum KmerDir {
+ FORWARD,
+ REVERSE,
+ }
+
@Override
public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
@@ -70,12 +80,24 @@
return new IKeyValueParser<LongWritable, Text>() {
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextKmer = new KmerBytesWritable(kmerSize);
private PositionWritable nodeId = new PositionWritable();
- private KmerListWritable kmerList = new KmerListWritable(kmerSize);
- private IntermediateNodeWritable interMediateNode = new IntermediateNodeWritable();
- private byte mateId = 0;
+ private PositionListWritable nodeIdList = new PositionListWritable();
+ private KmerListWritable edgeListForPreKmer = new KmerListWritable(kmerSize);;
+ private KmerListWritable edgeListForNextKmer = new KmerListWritable(kmerSize);;
+ private NodeWritable outputNode = new NodeWritable(kmerSize);
+
+ private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
+
+ private KmerDir preKmerDir = KmerDir.FORWARD;
+ private KmerDir curKmerDir = KmerDir.FORWARD;
+ private KmerDir nextKmerDir = KmerDir.FORWARD;
+
+ byte mateId = (byte) 0;
@Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
@@ -108,50 +130,167 @@
if (kmerSize >= array.length) {
return;
}
- kmer.setByRead(array, 0);
- nextKmer.set(kmer);
- nextKmer.shiftKmerWithNextChar(array[kmerSize]);
- kmerList.append(nextKmer);
- nextKmer.toString();
- kmerList.toString();
-// nodeId.set(mateId, readID, 1);
-// interMediateNode.setNodeId(nodeId);
-// interMediateNode.setFFList(kmerList);
- InsertToFrame(kmer, kmerList, writer);
+ outputNode.reset(kmerSize);
+ curForwardKmer.setByRead(array, 0);
+ curReverseKmer.setByReadReverse(array, 0);
+ curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ setNextKmer(array[kmerSize]);
+ setnodeId(mateId, readID, 1);
+ setEdgeListForNextKmer();
+ writeToFrame(writer);
/** middle kmer */
int i = kmerSize;
for (; i < array.length - 1; i++) {
- kmer.shiftKmerWithNextChar(array[i]);
- nextKmer.set(kmer);
- nextKmer.shiftKmerWithNextChar(array[i+1]);
- kmerList.append(nextKmer);
-// nodeId.set(mateId, readID, i - kmerSize + 2);
-// interMediateNode.setNodeId(nodeId);
-// interMediateNode.setFFList(kmerList);
- InsertToFrame(kmer, kmerList, writer);
+ outputNode.reset(kmerSize);
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ setNextKmer(array[i]);
+ setnodeId(mateId, readID, i - kmerSize + 1);
+ setEdgeListForPreKmer();
+ setEdgeListForNextKmer();
+ writeToFrame(writer);
}
-//
-// /** last kmer */
-// kmer.shiftKmerWithNextChar(array[i]);
-// nodeId.set(mateId, readID, i - kmerSize + 2);
-// interMediateNode.setNodeId(nodeId);
-// InsertToFrame(kmer, interMediateNode, writer);
+
+ /** last kmer */
+ outputNode.reset(kmerSize);
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ setnodeId(mateId, readID, array.length - kmerSize + 1);
+ setEdgeListForPreKmer();
+ writeToFrame(writer);
}
- //IntermediateNodeWritable node
- private void InsertToFrame(KmerBytesWritable kmer, KmerListWritable kmerList, IFrameWriter writer) {
+
+ public void setnodeId(byte mateId, long readID, int posId){
+ nodeId.set(mateId, readID, posId);
+ nodeIdList.reset();
+ nodeIdList.append(nodeId);
+ outputNode.setNodeIdList(nodeIdList);
+ }
+
+ public void setNextKmer(byte nextChar){
+ nextForwardKmer.set(curForwardKmer);
+ nextForwardKmer.shiftKmerWithNextChar(nextChar);
+ nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
+ nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ }
+
+ public void setPreKmerByOldCurKmer(){
+ preKmerDir = curKmerDir;
+ preForwardKmer.set(curForwardKmer);
+ preReverseKmer.set(curReverseKmer);
+ }
+
+ public void setCurKmerByOldNextKmer(){
+ curKmerDir = nextKmerDir;
+ curForwardKmer.set(nextForwardKmer);
+ curReverseKmer.set(nextReverseKmer);
+ }
+
+ public void writeToFrame(IFrameWriter writer) {
+ switch(curKmerDir){
+ case FORWARD:
+ InsertToFrame(curForwardKmer, outputNode, writer);
+ break;
+ case REVERSE:
+ InsertToFrame(curReverseKmer, outputNode, writer);
+ break;
+ }
+ }
+ public void setEdgeListForPreKmer(){
+ switch(curKmerDir){
+ case FORWARD:
+ switch(preKmerDir){
+ case FORWARD:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preForwardKmer);
+ outputNode.setRRList(edgeListForPreKmer);
+ break;
+ case REVERSE:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preReverseKmer);
+ outputNode.setRFList(edgeListForPreKmer);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch(preKmerDir){
+ case FORWARD:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preForwardKmer);
+ outputNode.setFRList(edgeListForPreKmer);
+ break;
+ case REVERSE:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preReverseKmer);
+ outputNode.setFFList(edgeListForPreKmer);
+ break;
+ }
+ break;
+ }
+ }
+
+ public void setEdgeListForNextKmer(){
+ switch(curKmerDir){
+ case FORWARD:
+ switch(nextKmerDir){
+ case FORWARD:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextForwardKmer);
+ outputNode.setFFList(edgeListForNextKmer);
+ break;
+ case REVERSE:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextReverseKmer);
+ outputNode.setFRList(edgeListForNextKmer);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch(nextKmerDir){
+ case FORWARD:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextForwardKmer);
+ outputNode.setRFList(edgeListForNextKmer);
+ break;
+ case REVERSE:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextReverseKmer);
+ outputNode.setRRList(edgeListForNextKmer);
+ break;
+ }
+ break;
+ }
+ }
+
+ private void InsertToFrame(KmerBytesWritable kmer, NodeWritable node, IFrameWriter writer) {
try {
-// if (Math.abs(node.getNodeId().getPosId()) > 32768) {
-// throw new IllegalArgumentException("Position id is beyond 32768 at " + node.getNodeId().getReadId());
-// }
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
- tupleBuilder.addField(kmerList.getByteArray(), kmer.getOffset(), kmer.getLength());
- //tupleBuilder.addField(node.getNodeId().getByteArray(), node.getNodeId().getStartOffset(), node.getNodeId().getLength());
+
+ //tupleBuilder.addField(node.getnodeId().getByteArray(), node.getnodeId().getStartOffset(), node.getnodeId().getLength());
// tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
// tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
// tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
// tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
+
+ tupleBuilder.addField(node.getNodeIdList().getByteArray(), node.getNodeIdList().getStartOffset(), node.getNodeIdList().getLength());
+
+ tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getFFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getFRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getRFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getRRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {