New version of genomix with HDFS read and write
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2855 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
new file mode 100644
index 0000000..31e6b85
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class KMerWriterFactory implements ITupleWriterFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleWriter getTupleWriter() {
+ return new ITupleWriter() {
+ byte newLine = "\n".getBytes()[0];
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try{
+ for(int i = 0 ; i < 3 ; i++){
+ byte[] data = tuple.getFieldData(0);
+ int start = tuple.getFieldStart(0);
+ int len = tuple.getFieldLength(0);
+ output.write(data, start, len);
+ output.writeChars(" ");
+ }
+ output.writeByte(newLine);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..2d7ee70
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -0,0 +1,232 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+ private static final long serialVersionUID = 1L;
+
+ private int k;
+ private int byteNum;
+
+ public ReadsKeyValueParserFactory(int k){
+ this.k = k;
+ byteNum = (byte)Math.ceil((double)k/4.0);
+ }
+ @Override
+ public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
+;
+
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+
+
+ return new IKeyValueParser<LongWritable, Text>() {
+
+ @Override
+ public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+ String geneLine = value.toString(); // Read the Real Gene Line
+ Pattern genePattern = Pattern.compile("[AGCT]+");
+ Matcher geneMatcher = genePattern.matcher(geneLine);
+ boolean isValid = geneMatcher.matches();
+ if(isValid){
+ SplitReads(geneLine.getBytes(), writer);
+ }
+ }
+
+ @Override
+ public void flush(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+
+
+ private byte[] CompressKmer(byte[] array, int start) {
+ // a: 00; c: 01; G: 10; T: 11
+
+ byte[] bytes = new byte[byteNum+1];
+ bytes[0] = (byte) k;
+
+ byte l = 0;
+ int count = 0;
+ int bcount = 0;
+
+ for (int i = start; i < start + k; i++) {
+ l <<= 2;
+ switch (array[i]) {
+ case 'A':
+ case 'a':
+ l |= 0;
+ break;
+ case 'C':
+ case 'c':
+ l |= 1;
+ break;
+ case 'G':
+ case 'g':
+ l |= 2;
+ break;
+ case 'T':
+ case 't':
+ l |= 3;
+ break;
+ }
+ count += 2;
+ if(count%8==0){
+ bcount += 1;
+ bytes[bcount] = l;
+ count = 0;
+ }
+ }
+ bytes[bcount + 1] = l;
+ return bytes;
+ }
+
+ private byte GetBitmap(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 1;
+ break;
+ case 'C':
+ case 'c':
+ r = 2;
+ break;
+ case 'G':
+ case 'g':
+ r = 4;
+ break;
+ case 'T':
+ case 't':
+ r = 8;
+ break;
+ }
+ return r;
+ }
+
+ private byte ConvertSymbol(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 0;
+ break;
+ case 'C':
+ case 'c':
+ r = 1;
+ break;
+ case 'G':
+ case 'g':
+ r = 2;
+ break;
+ case 'T':
+ case 't':
+ r = 3;
+ break;
+ }
+ return r;
+ }
+
+ void MoveKmer(byte[] bytes, byte c){
+ byte filter0 = (byte) 0xC0;
+ byte filter1 = (byte) 0xFC;
+ byte filter2 = 0;
+
+ int r = byteNum*8 - 2*k;
+ r = 8 - r;
+ for(int i = 0 ; i < r ; i++){
+ filter2 <<= 1;
+ filter2 |= 1;
+ }
+
+ int i = byteNum;
+ bytes[i] <<= 2;
+ bytes[i] &= filter2;
+ i -= 1;
+ while(i > 0){
+ byte f = (byte) (bytes[i] & filter0);
+ f >>= 6;
+ bytes[i+1] |= f;
+ bytes[i] <<= 2;
+ bytes[i] &= filter1;
+ }
+ bytes[i+1] |= ConvertSymbol(c);
+ }
+
+ private void SplitReads(byte[] array, IFrameWriter writer) {
+ try {
+ byte[] bytes=null;
+
+ byte pre = 0, next = 0;
+ byte r;
+
+ for (int i = 0; i < array.length - k + 1; i++) {
+ if (0 == i) {
+ bytes = CompressKmer(array, i);
+ } else {
+ MoveKmer(bytes, array[i + k - 1]);
+ /*l <<= 2;
+ l &= window;
+ l |= ConvertSymbol(array[i + k - 1]);*/
+ pre = GetBitmap(array[i - 1]);
+ }
+ if (i + k != array.length) {
+ next = GetBitmap(array[i + k]);
+ }
+
+ r = 0;
+ r |= pre;
+ r <<= 4;
+ r |= next;
+
+ /*System.out.print(l);
+ System.out.print(' ');
+ System.out.print(r);
+ System.out.println();*/
+
+ tupleBuilder.reset();
+
+ //tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
+ tupleBuilder.addField(bytes, 0, byteNum + 1);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
+
+
+ //int[] a = tupleBuilder.getFieldEndOffsets();
+ //int b = tupleBuilder.getSize();
+ //byte[] c = tupleBuilder.getByteArray();
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
+ 0, tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ }
+
+}
\ No newline at end of file