rewrite kmer representation
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2960 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 6951eef..3a53ef2 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -10,6 +10,8 @@
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
import edu.uci.ics.genomix.type.Kmer;
+import edu.uci.ics.genomix.type.Kmer.GENE_CODE;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -73,8 +75,6 @@
private ByteBuffer outputBuffer;
private FrameTupleAppender outputAppender;
- private byte[] filter = new byte[4];
-
@SuppressWarnings("resource")
@Override
public void initialize() {
@@ -84,8 +84,6 @@
outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputAppender.reset(outputBuffer, true);
- Kmer.initializeFilter(k, filter);
-
try {// one try with multiple catch?
writer.open();
String s = pathSurfix + String.valueOf(temp);
@@ -103,7 +101,7 @@
while (read != null) {
read = readsfile.readLine();
// if(count % 4 == 1)
- SplitReads(read.getBytes());
+ SplitReads(read.getBytes(),writer);
// read.getBytes();
read = readsfile.readLine();
@@ -127,63 +125,47 @@
}
}
- private void SplitReads(byte[] array) {
+ private void SplitReads(byte[] array, IFrameWriter writer) {
+ /** first kmer */
+ byte[] kmer = Kmer.CompressKmer(k, array, 0);
+ byte pre = 0;
+ byte next = GENE_CODE.getAdjBit(array[k]);
+ InsertToFrame(kmer, pre, next, writer);
+
+ /** middle kmer */
+ for (int i = k; i < array.length - 1; i++) {
+ pre = Kmer.MoveKmer(k, kmer, array[i]);
+ next = GENE_CODE.getAdjBit(array[i + 1]);
+ InsertToFrame(kmer, pre, next, writer);
+
+ }
+ /** last kmer */
+ pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);
+ next = 0;
+ InsertToFrame(kmer, pre, next, writer);
+ }
+
+ private void InsertToFrame(byte[] kmer, byte pre, byte next,
+ IFrameWriter writer) {
try {
- byte[] bytes = null;
+ byte adj = GENE_CODE.mergePreNextAdj(pre, next);
+ tupleBuilder.reset();
+ tupleBuilder.addField(kmer, 0, byteNum + 1);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,
+ adj);
- byte pre = 0, next = 0;
- byte r;
-
- for (int i = 0; i < array.length - k + 1; i++) {
- if (0 == i) {
- bytes = Kmer.CompressKmer(k, array, i);
- } else {
- Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);
- /*
- * l <<= 2; l &= window; l |= ConvertSymbol(array[i
- * + k - 1]);
- */
- pre = Kmer.GENE_CODE.getAdjBit(array[i - 1]);
- }
- if (i + k != array.length) {
- next = Kmer.GENE_CODE.getAdjBit(array[i + k]);
- }
-
- r = 0;
- r |= pre;
- r <<= 4;
- r |= next;
-
- /*
- * System.out.print(l); System.out.print(' ');
- * System.out.print(r); System.out.println();
- */
-
- tupleBuilder.reset();
-
- // tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,
- // l);
- tupleBuilder.addField(bytes, 0, byteNum + 1);
- tupleBuilder.addField(
- ByteSerializerDeserializer.INSTANCE, r);
-
- // int[] a = tupleBuilder.getFieldEndOffsets();
- // int b = tupleBuilder.getSize();
- // byte[] c = tupleBuilder.getByteArray();
-
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
if (!outputAppender.append(
tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
}
}
} catch (Exception e) {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 1370494..e25ce40 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+@SuppressWarnings("deprecation")
public class KMerSequenceWriterFactory implements ITupleWriterFactory {
private static final long serialVersionUID = 1L;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index 9f19858..441b798 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -3,8 +3,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.Text;
-
import edu.uci.ics.genomix.type.Kmer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -28,13 +26,16 @@
public void write(DataOutput output, ITupleReference tuple)
throws HyracksDataException {
try {
- Text.writeString(output, Kmer.recoverKmerFrom(KMER,
+ output.writeChars(Kmer.recoverKmerFrom(KMER,
tuple.getFieldData(0), tuple.getFieldStart(0),
tuple.getFieldLength(0)));
- Text.writeString(output, "\t");
- Text.writeString(output, Kmer.recoverAdjacent(tuple
+ output.writeChar('\t');
+ output.writeChars(Kmer.recoverAdjacent(tuple
.getFieldData(1)[tuple.getFieldStart(1)]));
- Text.writeString(output, "\n");
+ output.writeChar('\t');
+ output.writeInt((int)tuple
+ .getFieldData(2)[tuple.getFieldStart(2)]);
+ output.writeChar('\n');
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
deleted file mode 100644
index 13ee200..0000000
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.DataOutput;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class KMerWriterFactory implements ITupleWriterFactory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public ITupleWriter getTupleWriter() {
- return new ITupleWriter() {
- byte newLine = "\n".getBytes()[0];
-
- @Override
- public void write(DataOutput output, ITupleReference tuple)
- throws HyracksDataException {
- try {
- for (int i = 0; i < 3; i++) {
- byte[] data = tuple.getFieldData(i);
- int start = tuple.getFieldStart(i);
- int len = tuple.getFieldLength(i);
- output.write(data, start, len);
- output.writeChar(' ');
- }
- output.writeByte(newLine);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- };
- }
-
-}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 0dfc69f..1b13760 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -9,6 +9,7 @@
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
import edu.uci.ics.genomix.type.Kmer;
+import edu.uci.ics.genomix.type.Kmer.GENE_CODE;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -24,12 +25,10 @@
private int k;
private int byteNum;
- private byte[] filter = new byte[4];
public ReadsKeyValueParserFactory(int k) {
this.k = k;
byteNum = (byte) Math.ceil((double) k / 4.0);
- Kmer.initializeFilter(k, filter);
}
@Override
@@ -63,46 +62,46 @@
}
private void SplitReads(byte[] array, IFrameWriter writer) {
+ /** first kmer */
+ byte[] kmer = Kmer.CompressKmer(k, array, 0);
+ byte pre = 0;
+ byte next = GENE_CODE.getAdjBit(array[k]);
+ InsertToFrame(kmer, pre, next, writer);
+
+ /** middle kmer */
+ for (int i = k; i < array.length - 1; i++) {
+ pre = Kmer.MoveKmer(k, kmer, array[i]);
+ next = GENE_CODE.getAdjBit(array[i + 1]);
+ InsertToFrame(kmer, pre, next, writer);
+
+ }
+ /** last kmer */
+ pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);
+ next = 0;
+ InsertToFrame(kmer, pre, next, writer);
+ }
+
+ private void InsertToFrame(byte[] kmer, byte pre, byte next,
+ IFrameWriter writer) {
try {
- byte[] bytes = null;
+ byte adj = GENE_CODE.mergePreNextAdj(pre, next);
+ tupleBuilder.reset();
+ tupleBuilder.addField(kmer, 0, byteNum + 1);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,
+ adj);
- byte pre = 0, next = 0;
- byte r;
-
- for (int i = 0; i < array.length - k + 1; i++) {
- if (0 == i) {
- bytes = Kmer.CompressKmer(k, array, i);
- } else {
- Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);
- pre = Kmer.GENE_CODE.getAdjBit(array[i - 1]);
- }
- if (i + k != array.length) {
- next = Kmer.GENE_CODE.getAdjBit(array[i + k]);
- }
-
- r = 0;
- r |= pre;
- r <<= 4;
- r |= next;
-
- tupleBuilder.reset();
- tupleBuilder.addField(bytes, 0, byteNum + 1);
- tupleBuilder.addField(
- ByteSerializerDeserializer.INSTANCE, r);
-
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
if (!outputAppender.append(
tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
}
}
} catch (Exception e) {
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
index 0281ebc..19d754d 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
@@ -1,10 +1,5 @@
package edu.uci.ics.genomix.type;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
public class Kmer {
@@ -73,6 +68,10 @@
return r;
}
+ public static byte mergePreNextAdj(byte pre, byte next){
+ return (byte) (pre << 4 | next & 0x0f);
+ }
+
public static String getSymbolFromBitMap(byte code) {
int left = (code >> 4) & 0x0F;
int right = code & 0x0F;
@@ -94,14 +93,16 @@
public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
int keyLength) {
- byte kmer = keyData[keyStart];
-
- String sblder = String.valueOf((int) kmer) + " ";
- for (int i = keyStart + 1; i < keyStart + keyLength; i++) {
- byte genecode = keyData[i];
- sblder += String.valueOf((int) genecode) + " ";
+ String strKmer = new String();
+ int byteId = keyStart + keyLength-1;
+ byte currentbyte = keyData[byteId];
+ for(int geneCount = 0; geneCount < k ; geneCount++){
+ if (geneCount % 4 == 0 && geneCount > 0){
+ currentbyte = keyData[--byteId];
+ }
+ strKmer += (char)GENE_SYMBOL[(currentbyte >> ((geneCount%4)*2)) & 0x03];
}
- return sblder;
+ return strKmer;
}
public static String recoverAdjacent(byte number) {
@@ -110,71 +111,77 @@
return String.valueOf(incoming) + '|' + String.valueOf(outgoing);
}
-
- public static void initializeFilter(int k, byte[] filter) {
- filter[0] = (byte) 0xC0;
- filter[1] = (byte) 0xFC;
- filter[2] = 0;
- filter[3] = 3;
- final int byteNum = (byte) Math.ceil((double) k / 4.0);
-
- int r = byteNum * 8 - 2 * k;
- r = 8 - r;
- for (int i = 0; i < r; i++) {
- filter[2] <<= 1;
- filter[2] |= 1;
- }
- for (int i = 0; i < r - 1; i++) {
- filter[3] <<= 1;
- }
- }
-
+ /**
+ * Compress Kmer into bytes array
+ * AATAG will compress as [0 0 0 G][A T A A]
+ * @param kmer
+ * @param input array
+ * @param start position
+ * @return initialed kmer array
+ */
public static byte[] CompressKmer(int k, byte[] array, int start) {
final int byteNum = (byte) Math.ceil((double) k / 4.0);
- byte[] bytes = new byte[byteNum + 1];
- bytes[0] = (byte) k;
+ byte[] bytes = new byte[byteNum ];
byte l = 0;
- int count = 0;
- int bcount = 0;
-
- for (int i = start; i < start + k; i++) {
- l = (byte) ((l << 2) & 0xFC);
- l |= GENE_CODE.getCodeFromSymbol(array[i]);
- count += 2;
- if (count % 8 == 0 && byteNum - bcount > 1) {
- bytes[byteNum - bcount] = l;
- bcount += 1;
- count = 0;
+ int bytecount = 0;
+ int bcount = byteNum-1;
+ for (int i = start; i<start +k; i++){
+ byte code = GENE_CODE.getCodeFromSymbol(array[i]);
+ l |= (byte) (code << bytecount);
+ bytecount +=2;
+ if (bytecount == 8){
+ bytes[bcount--] = l;
l = 0;
- }
- if (byteNum - bcount <= 1) {
- break;
+ bytecount= 0;
}
}
- bytes[1] = l;
+ if (bcount >= 0){
+ bytes[0]=l;
+ }
return bytes;
}
- public static void MoveKmer(int k, byte[] bytes, byte c, byte filter[]) {
- int i = (byte) Math.ceil((double) k / 4.0);
- ;
- bytes[i] <<= 2;
- bytes[i] &= filter[1];
- i -= 1;
- while (i > 1) {
- byte f = (byte) (bytes[i] & filter[0]);
- f >>= 6;
- f &= 3;
- bytes[i + 1] |= f;
- bytes[i] <<= 2;
- bytes[i] &= filter[1];
- i -= 1;
+ /**
+ * Shift Kmer to accept new input
+ * @param kmer
+ * @param bytes Kmer Array
+ * @param c Input new gene character
+ * @return the shiftout gene, in gene code format
+ */
+ public static byte MoveKmer(int k, byte[] kmer, byte c) {
+ int byteNum = (byte) Math.ceil((double) k / 4.0);
+ byte output = (byte) (kmer[byteNum-1] & 0x03);
+ for(int i = byteNum-1; i >0; i--){
+ byte in = (byte) (kmer[i-1] & 0x03);
+ kmer[i] = (byte) ((kmer[i] >>> 2) | (in << 6));
}
- bytes[2] |= (byte) (bytes[1] & filter[3]);
- bytes[1] <<= 2;
- bytes[1] &= filter[2];
- bytes[1] |= GENE_CODE.getCodeFromSymbol(c);
+
+ int pos = ((k-1) % 4) *2;
+ byte code = (byte) (GENE_CODE.getCodeFromSymbol(c) << pos);
+ kmer[0] = (byte) ((kmer[0] >>> 2) | code);
+ return output;
}
+ public static void main(String [] argv){
+ byte[] array = {'A','A','T','A','G','C','A','G'};
+ int k = 5;
+ byte[] kmer = CompressKmer(k, array, 0);
+ for (byte b : kmer){
+ System.out.print((int)b);
+ System.out.print(' ');
+ }
+ System.out.println();
+ System.out.println(recoverKmerFrom(k,kmer,0,kmer.length));
+
+ byte out = MoveKmer(k, kmer, array[k]);
+
+ System.out.println((int)out);
+ for (byte b : kmer){
+ System.out.print(Integer.toBinaryString(b));
+ System.out.print(' ');
+ }
+ System.out.println();
+ System.out.println(recoverKmerFrom(k,kmer,0,kmer.length));
+ }
}
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index 788ebca..07c2e1d 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -120,7 +120,7 @@
public void TestExternalGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "external");
- conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "text");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
}