rewrite kmer representation

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2960 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 6951eef..3a53ef2 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -10,6 +10,8 @@
 

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

 import edu.uci.ics.genomix.type.Kmer;

+import edu.uci.ics.genomix.type.Kmer.GENE_CODE;

+import edu.uci.ics.hyracks.api.comm.IFrameWriter;

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

@@ -73,8 +75,6 @@
 			private ByteBuffer outputBuffer;

 			private FrameTupleAppender outputAppender;

 

-			private byte[] filter = new byte[4];

-

 			@SuppressWarnings("resource")

 			@Override

 			public void initialize() {

@@ -84,8 +84,6 @@
 				outputAppender = new FrameTupleAppender(ctx.getFrameSize());

 				outputAppender.reset(outputBuffer, true);

 

-				Kmer.initializeFilter(k, filter);

-

 				try {// one try with multiple catch?

 					writer.open();

 					String s = pathSurfix + String.valueOf(temp);

@@ -103,7 +101,7 @@
 						while (read != null) {

 							read = readsfile.readLine();

 							// if(count % 4 == 1)

-							SplitReads(read.getBytes());

+							SplitReads(read.getBytes(),writer);

 							// read.getBytes();

 							read = readsfile.readLine();

 

@@ -127,63 +125,47 @@
 				}

 			}

 

-			private void SplitReads(byte[] array) {

+			private void SplitReads(byte[] array, IFrameWriter writer) {

+				/** first kmer */

+				byte[] kmer = Kmer.CompressKmer(k, array, 0);

+				byte pre = 0;

+				byte next = GENE_CODE.getAdjBit(array[k]);

+				InsertToFrame(kmer, pre, next, writer);

+

+				/** middle kmer */

+				for (int i = k; i < array.length - 1; i++) {

+					pre = Kmer.MoveKmer(k, kmer, array[i]);

+					next = GENE_CODE.getAdjBit(array[i + 1]);

+					InsertToFrame(kmer, pre, next, writer);

+

+				}

+				/** last kmer */

+				pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);

+				next = 0;

+				InsertToFrame(kmer, pre, next, writer);

+			}

+

+			private void InsertToFrame(byte[] kmer, byte pre, byte next,

+					IFrameWriter writer) {

 				try {

-					byte[] bytes = null;

+					byte adj = GENE_CODE.mergePreNextAdj(pre, next);

+					tupleBuilder.reset();

+					tupleBuilder.addField(kmer, 0, byteNum + 1);

+					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,

+							adj);

 

-					byte pre = 0, next = 0;

-					byte r;

-

-					for (int i = 0; i < array.length - k + 1; i++) {

-						if (0 == i) {

-							bytes = Kmer.CompressKmer(k, array, i);

-						} else {

-							Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);

-							/*

-							 * l <<= 2; l &= window; l |= ConvertSymbol(array[i

-							 * + k - 1]);

-							 */

-							pre = Kmer.GENE_CODE.getAdjBit(array[i - 1]);

-						}

-						if (i + k != array.length) {

-							next = Kmer.GENE_CODE.getAdjBit(array[i + k]);

-						}

-

-						r = 0;

-						r |= pre;

-						r <<= 4;

-						r |= next;

-

-						/*

-						 * System.out.print(l); System.out.print(' ');

-						 * System.out.print(r); System.out.println();

-						 */

-

-						tupleBuilder.reset();

-

-						// tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,

-						// l);

-						tupleBuilder.addField(bytes, 0, byteNum + 1);

-						tupleBuilder.addField(

-								ByteSerializerDeserializer.INSTANCE, r);

-

-						// int[] a = tupleBuilder.getFieldEndOffsets();

-						// int b = tupleBuilder.getSize();

-						// byte[] c = tupleBuilder.getByteArray();

-

+					if (!outputAppender.append(

+							tupleBuilder.getFieldEndOffsets(),

+							tupleBuilder.getByteArray(), 0,

+							tupleBuilder.getSize())) {

+						FrameUtils.flushFrame(outputBuffer, writer);

+						outputAppender.reset(outputBuffer, true);

 						if (!outputAppender.append(

 								tupleBuilder.getFieldEndOffsets(),

 								tupleBuilder.getByteArray(), 0,

 								tupleBuilder.getSize())) {

-							FrameUtils.flushFrame(outputBuffer, writer);

-							outputAppender.reset(outputBuffer, true);

-							if (!outputAppender.append(

-									tupleBuilder.getFieldEndOffsets(),

-									tupleBuilder.getByteArray(), 0,

-									tupleBuilder.getSize())) {

-								throw new IllegalStateException(

-										"Failed to copy an record into a frame: the record size is too large.");

-							}

+							throw new IllegalStateException(

+									"Failed to copy an record into a frame: the record size is too large.");

 						}

 					}

 				} catch (Exception e) {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 1370494..e25ce40 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -17,6 +17,7 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 
+@SuppressWarnings("deprecation")
 public class KMerSequenceWriterFactory implements ITupleWriterFactory {
 
 	private static final long serialVersionUID = 1L;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index 9f19858..441b798 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -3,8 +3,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.Text;
-
 import edu.uci.ics.genomix.type.Kmer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -28,13 +26,16 @@
 		public void write(DataOutput output, ITupleReference tuple)
 				throws HyracksDataException {
 			try {
-				Text.writeString(output, Kmer.recoverKmerFrom(KMER,
+				output.writeChars(Kmer.recoverKmerFrom(KMER,
 						tuple.getFieldData(0), tuple.getFieldStart(0),
 						tuple.getFieldLength(0)));
-				Text.writeString(output, "\t");
-				Text.writeString(output, Kmer.recoverAdjacent(tuple
+				output.writeChar('\t');
+				output.writeChars(Kmer.recoverAdjacent(tuple
 						.getFieldData(1)[tuple.getFieldStart(1)]));
-				Text.writeString(output, "\n");
+				output.writeChar('\t');
+				output.writeInt((int)tuple
+						.getFieldData(2)[tuple.getFieldStart(2)]);
+				output.writeChar('\n');
 			} catch (IOException e) {
 				throw new HyracksDataException(e);
 			}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
deleted file mode 100644
index 13ee200..0000000
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package edu.uci.ics.genomix.dataflow;

-

-import java.io.DataOutput;

-

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;

-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;

-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;

-

-public class KMerWriterFactory implements ITupleWriterFactory {

-	private static final long serialVersionUID = 1L;

-

-	@Override

-	public ITupleWriter getTupleWriter() {

-		return new ITupleWriter() {

-			byte newLine = "\n".getBytes()[0];

-

-			@Override

-			public void write(DataOutput output, ITupleReference tuple)

-					throws HyracksDataException {

-				try {

-					for (int i = 0; i < 3; i++) {

-						byte[] data = tuple.getFieldData(i);

-						int start = tuple.getFieldStart(i);

-						int len = tuple.getFieldLength(i);

-						output.write(data, start, len);

-						output.writeChar(' ');

-					}

-					output.writeByte(newLine);

-				} catch (Exception e) {

-					throw new HyracksDataException(e);

-				}

-			}

-

-		};

-	}

-

-}

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 0dfc69f..1b13760 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -9,6 +9,7 @@
 

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

 import edu.uci.ics.genomix.type.Kmer;

+import edu.uci.ics.genomix.type.Kmer.GENE_CODE;

 import edu.uci.ics.hyracks.api.comm.IFrameWriter;

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

@@ -24,12 +25,10 @@
 

 	private int k;

 	private int byteNum;

-	private byte[] filter = new byte[4];

 

 	public ReadsKeyValueParserFactory(int k) {

 		this.k = k;

 		byteNum = (byte) Math.ceil((double) k / 4.0);

-		Kmer.initializeFilter(k, filter);

 	}

 

 	@Override

@@ -63,46 +62,46 @@
 			}

 

 			private void SplitReads(byte[] array, IFrameWriter writer) {

+				/** first kmer */

+				byte[] kmer = Kmer.CompressKmer(k, array, 0);

+				byte pre = 0;

+				byte next = GENE_CODE.getAdjBit(array[k]);

+				InsertToFrame(kmer, pre, next, writer);

+

+				/** middle kmer */

+				for (int i = k; i < array.length - 1; i++) {

+					pre = Kmer.MoveKmer(k, kmer, array[i]);

+					next = GENE_CODE.getAdjBit(array[i + 1]);

+					InsertToFrame(kmer, pre, next, writer);

+

+				}

+				/** last kmer */

+				pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);

+				next = 0;

+				InsertToFrame(kmer, pre, next, writer);

+			}

+

+			private void InsertToFrame(byte[] kmer, byte pre, byte next,

+					IFrameWriter writer) {

 				try {

-					byte[] bytes = null;

+					byte adj = GENE_CODE.mergePreNextAdj(pre, next);

+					tupleBuilder.reset();

+					tupleBuilder.addField(kmer, 0, byteNum + 1);

+					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,

+							adj);

 

-					byte pre = 0, next = 0;

-					byte r;

-

-					for (int i = 0; i < array.length - k + 1; i++) {

-						if (0 == i) {

-							bytes = Kmer.CompressKmer(k, array, i);

-						} else {

-							Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);

-							pre = Kmer.GENE_CODE.getAdjBit(array[i - 1]);

-						}

-						if (i + k != array.length) {

-							next = Kmer.GENE_CODE.getAdjBit(array[i + k]);

-						}

-

-						r = 0;

-						r |= pre;

-						r <<= 4;

-						r |= next;

-

-						tupleBuilder.reset();

-						tupleBuilder.addField(bytes, 0, byteNum + 1);

-						tupleBuilder.addField(

-								ByteSerializerDeserializer.INSTANCE, r);

-

+					if (!outputAppender.append(

+							tupleBuilder.getFieldEndOffsets(),

+							tupleBuilder.getByteArray(), 0,

+							tupleBuilder.getSize())) {

+						FrameUtils.flushFrame(outputBuffer, writer);

+						outputAppender.reset(outputBuffer, true);

 						if (!outputAppender.append(

 								tupleBuilder.getFieldEndOffsets(),

 								tupleBuilder.getByteArray(), 0,

 								tupleBuilder.getSize())) {

-							FrameUtils.flushFrame(outputBuffer, writer);

-							outputAppender.reset(outputBuffer, true);

-							if (!outputAppender.append(

-									tupleBuilder.getFieldEndOffsets(),

-									tupleBuilder.getByteArray(), 0,

-									tupleBuilder.getSize())) {

-								throw new IllegalStateException(

-										"Failed to copy an record into a frame: the record size is too large.");

-							}

+							throw new IllegalStateException(

+									"Failed to copy an record into a frame: the record size is too large.");

 						}

 					}

 				} catch (Exception e) {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
index 0281ebc..19d754d 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
@@ -1,10 +1,5 @@
 package edu.uci.ics.genomix.type;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
 
 public class Kmer {
 
@@ -73,6 +68,10 @@
 			return r;
 		}
 		
+		public static byte mergePreNextAdj(byte pre, byte next){
+			return (byte) (pre << 4 | next & 0x0f);
+		}
+		
 		public static String getSymbolFromBitMap(byte code) {
 			int left = (code >> 4) & 0x0F;
 			int right = code & 0x0F;
@@ -94,14 +93,16 @@
 
 	public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
 			int keyLength) {
-		byte kmer = keyData[keyStart];
-
-		String sblder = String.valueOf((int) kmer) + " ";
-		for (int i = keyStart + 1; i < keyStart + keyLength; i++) {
-			byte genecode = keyData[i];
-			sblder += String.valueOf((int) genecode) + " ";
+		String strKmer = new String();
+		int byteId = keyStart + keyLength-1;
+		byte currentbyte = keyData[byteId];
+		for(int geneCount = 0; geneCount < k ; geneCount++){
+			if (geneCount % 4 == 0 && geneCount > 0){
+				currentbyte = keyData[--byteId];
+			}
+			strKmer += (char)GENE_SYMBOL[(currentbyte >> ((geneCount%4)*2)) & 0x03];
 		}
-		return sblder;
+		return strKmer;
 	}
 
 	public static String recoverAdjacent(byte number) {
@@ -110,71 +111,77 @@
 		return String.valueOf(incoming) + '|' + String.valueOf(outgoing);
 	}
 
-
-	public static void initializeFilter(int k, byte[] filter) {
-		filter[0] = (byte) 0xC0;
-		filter[1] = (byte) 0xFC;
-		filter[2] = 0;
-		filter[3] = 3;
-		final int byteNum = (byte) Math.ceil((double) k / 4.0);
-
-		int r = byteNum * 8 - 2 * k;
-		r = 8 - r;
-		for (int i = 0; i < r; i++) {
-			filter[2] <<= 1;
-			filter[2] |= 1;
-		}
-		for (int i = 0; i < r - 1; i++) {
-			filter[3] <<= 1;
-		}
-	}
-
+	/**
+	 * Compress Kmer into bytes array
+	 * AATAG will compress as [0 0 0 G][A T A A]
+	 * @param kmer 
+	 * @param input array
+	 * @param start position
+	 * @return initialed kmer array
+	 */
 	public static byte[] CompressKmer(int k, byte[] array, int start) {
 		final int byteNum = (byte) Math.ceil((double) k / 4.0);
-		byte[] bytes = new byte[byteNum + 1];
-		bytes[0] = (byte) k;
+		byte[] bytes = new byte[byteNum ];
 
 		byte l = 0;
-		int count = 0;
-		int bcount = 0;
-
-		for (int i = start; i < start + k; i++) {
-			l = (byte) ((l << 2) & 0xFC);
-			l |= GENE_CODE.getCodeFromSymbol(array[i]);
-			count += 2;
-			if (count % 8 == 0 && byteNum - bcount > 1) {
-				bytes[byteNum - bcount] = l;
-				bcount += 1;
-				count = 0;
+		int bytecount = 0;
+		int bcount = byteNum-1;
+		for (int i = start; i<start +k; i++){
+			byte code = GENE_CODE.getCodeFromSymbol(array[i]);
+			l |= (byte) (code << bytecount);
+			bytecount +=2;
+			if (bytecount == 8){
+				bytes[bcount--] = l;
 				l = 0;
-			}
-			if (byteNum - bcount <= 1) {
-				break;
+				bytecount= 0;
 			}
 		}
-		bytes[1] = l;
+		if (bcount >= 0){
+			bytes[0]=l;
+		}
 		return bytes;
 	}
 
-	public static void MoveKmer(int k, byte[] bytes, byte c, byte filter[]) {
-		int i = (byte) Math.ceil((double) k / 4.0);
-		;
-		bytes[i] <<= 2;
-		bytes[i] &= filter[1];
-		i -= 1;
-		while (i > 1) {
-			byte f = (byte) (bytes[i] & filter[0]);
-			f >>= 6;
-			f &= 3;
-			bytes[i + 1] |= f;
-			bytes[i] <<= 2;
-			bytes[i] &= filter[1];
-			i -= 1;
+	/**
+	 * Shift Kmer to accept new input
+	 * @param kmer
+	 * @param bytes Kmer Array
+	 * @param c Input new gene character
+	 * @return the shiftout gene, in gene code format
+	 */
+	public static byte MoveKmer(int k, byte[] kmer, byte c) {
+		int byteNum = (byte) Math.ceil((double) k / 4.0);
+		byte output = (byte) (kmer[byteNum-1] & 0x03);
+		for(int i = byteNum-1; i >0; i--){
+			byte in = (byte) (kmer[i-1] & 0x03);
+			kmer[i] = (byte) ((kmer[i] >>> 2) | (in << 6));
 		}
-		bytes[2] |= (byte) (bytes[1] & filter[3]);
-		bytes[1] <<= 2;
-		bytes[1] &= filter[2];
-		bytes[1] |= GENE_CODE.getCodeFromSymbol(c);
+		
+		int pos = ((k-1) % 4) *2;
+		byte code = (byte) (GENE_CODE.getCodeFromSymbol(c) << pos);
+		kmer[0] = (byte) ((kmer[0] >>> 2) | code);
+		return output;
 	}
 
+	public static void main(String [] argv){
+		byte[] array = {'A','A','T','A','G','C','A','G'};
+		int k = 5;
+		byte[] kmer = CompressKmer(k, array, 0);
+		for (byte b : kmer){
+			System.out.print((int)b);
+			System.out.print(' ');
+		}
+		System.out.println();
+		System.out.println(recoverKmerFrom(k,kmer,0,kmer.length));
+		
+		byte out = MoveKmer(k, kmer, array[k]);
+		
+		System.out.println((int)out);
+		for (byte b : kmer){
+			System.out.print(Integer.toBinaryString(b));
+			System.out.print(' ');
+		}
+		System.out.println();
+		System.out.println(recoverKmerFrom(k,kmer,0,kmer.length));
+	}
 }
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index 788ebca..07c2e1d 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -120,7 +120,7 @@
 	public void TestExternalGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "external");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
 	}