finished genomix-hyracks end to end
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 6b3be5f..a12c654 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -118,5 +118,17 @@
public int hashCode() {
return nodeID.hashCode();
}
+
+ @Override
+ public String toString(){
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('(');
+ sbuilder.append(nodeID.toString());
+ sbuilder.append(',').append(countOfKmer).append(',');
+ sbuilder.append(incomingList.toString()).append(',');
+ sbuilder.append(incomingList.toString()).append(',');
+ sbuilder.append(kmer.toString()).append(')');
+ return sbuilder.toString();
+ }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index af07ed5..458ae43 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -145,4 +145,15 @@
out.write(storage, offset, valueCount * PositionWritable.LENGTH);
}
+ @Override
+ public String toString(){
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('[');
+ for(PositionWritable pos : this){
+ sbuilder.append(pos.toString());
+ sbuilder.append(',');
+ }
+ sbuilder.replace(getLength()-1, getLength(), "]");
+ return sbuilder.toString();
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index b9b7213..eac0045 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -18,7 +18,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index ee17228..272a21c 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -1,11 +1,25 @@
package edu.uci.ics.genomix.hyracks.dataflow.io;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
public class NodeSequenceWriterFactory implements ITupleWriterFactory {
@@ -14,14 +28,76 @@
*/
private static final long serialVersionUID = 1L;
- public NodeSequenceWriterFactory(JobConf job) {
- // TODO Auto-generated constructor stub
+ public static final int InputNodeIDField = MapReadToNodeOperator.OutputNodeIDField;
+ public static final int InputCountOfKmerField = MapReadToNodeOperator.OutputCountOfKmerField;
+ public static final int InputIncomingField = MapReadToNodeOperator.OutputIncomingField;
+ public static final int InputOutgoingField = MapReadToNodeOperator.OutputOutgoingField;
+ public static final int InputKmerBytesField = MapReadToNodeOperator.OutputKmerBytesField;
+
+ private ConfFactory confFactory;
+ private final int kmerlength;
+
+ public NodeSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+ this.confFactory = new ConfFactory(conf);
+ this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
}
+ public class TupleWriter implements ITupleWriter {
+
+ public TupleWriter(ConfFactory confFactory) {
+ this.cf = confFactory;
+ }
+
+ ConfFactory cf;
+ Writer writer = null;
+ NodeWritable node = new NodeWritable(kmerlength);
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ try {
+ writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, NodeWritable.class, null,
+ CompressionType.NONE, null);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ node.getNodeID().setNewReference(tuple.getFieldData(InputNodeIDField),
+ tuple.getFieldStart(InputNodeIDField));
+ node.setCount(Marshal.getInt(tuple.getFieldData(InputCountOfKmerField),
+ tuple.getFieldStart(InputCountOfKmerField)));
+ node.getIncomingList().setNewReference(tuple.getFieldLength(InputIncomingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputIncomingField), tuple.getFieldStart(InputIncomingField));
+ node.getOutgoingList().setNewReference(tuple.getFieldLength(InputOutgoingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputOutgoingField), tuple.getFieldStart(InputOutgoingField));
+
+ node.getKmer().setNewReference(node.getCount() + kmerlength - 1, tuple.getFieldData(InputKmerBytesField),
+ tuple.getFieldStart(InputKmerBytesField));
+
+ try {
+ writer.append(node, null);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ /**
+ * Input schema:
+ * (Position, LengthCount, InComingPosList, OutgoingPosList, Kmer)
+ */
@Override
public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- // TODO Auto-generated method stub
- return null;
+ return new TupleWriter(confFactory);
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
index 7f64f28..2dfd334 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -1,7 +1,15 @@
package edu.uci.ics.genomix.hyracks.dataflow.io;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
@@ -11,11 +19,55 @@
*
*/
private static final long serialVersionUID = 1L;
+ private final int initialKmerSize;
+
+ public NodeTextWriterFactory(int initialKmerSize) {
+ this.initialKmerSize = initialKmerSize;
+ }
@Override
public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
- // TODO Auto-generated method stub
- return null;
+ return new ITupleWriter() {
+ NodeWritable node = new NodeWritable(initialKmerSize);
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
+ node.setCount(Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)));
+ node.getIncomingList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputIncomingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputIncomingField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputIncomingField));
+ node.getOutgoingList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputOutgoingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputOutgoingField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputOutgoingField));
+
+ node.getKmer().setNewReference(node.getCount() + initialKmerSize - 1,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputKmerBytesField));
+ try {
+ output.write(node.toString().getBytes());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 47fec00..117b7e0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -240,7 +240,7 @@
switch (outputFormat) {
case TEXT:
kmerWriter = new KMerTextWriterFactory(kmerSize);
- nodeWriter = new NodeTextWriterFactory();
+ nodeWriter = new NodeTextWriterFactory(kmerSize);
break;
case BINARY:
default: