add the byteswritable

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2880 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hadoop/actual/result2/.part-00000.crc b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
index 7a34e25..8871afc 100755
--- a/genomix/genomix-hadoop/actual/result2/.part-00000.crc
+++ b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
Binary files differ
diff --git a/genomix/genomix-hadoop/actual/result2/part-00000 b/genomix/genomix-hadoop/actual/result2/part-00000
index 1f33880..9d64c60 100755
--- a/genomix/genomix-hadoop/actual/result2/part-00000
+++ b/genomix/genomix-hadoop/actual/result2/part-00000
@@ -1,7 +1,7 @@
-1	33	1
-3	1	1
-4	-103	2
-12	18	1
-16	18	1
-19	16	1
-49	17	1
+01	33	1
+03	1	1
+04	-103	2
+0c	18	1
+10	18	1
+13	16	1
+31	17	1
diff --git a/genomix/genomix-hadoop/expected/result2 b/genomix/genomix-hadoop/expected/result2
index bc7bc64..aa56fbf 100755
--- a/genomix/genomix-hadoop/expected/result2
+++ b/genomix/genomix-hadoop/expected/result2
@@ -1,7 +1,7 @@
-1 33 1
-3 1 1
-4 -103 2
-12 18 1
-16 18 1
-19 16 1
-49 17 1
+01 33 1
+03 1 1
+04 -103 2
+0c 18 1
+10 18 1
+13 16 1
+31 17 1
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
index dadbb9a..74ef455 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.VLongWritable;
@@ -29,12 +30,12 @@
  * This class implement the combiner operator of Mapreduce model
  */
 public class GenomixCombiner extends MapReduceBase implements
-        Reducer<VLongWritable, ValueWritable, VLongWritable, ValueWritable> {
+        Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
     public ValueWritable vaWriter = new ValueWritable();
 
     @Override
-    public void reduce(VLongWritable key, Iterator<ValueWritable> values,
-            OutputCollector<VLongWritable, ValueWritable> output, Reporter reporter) throws IOException {
+    public void reduce(BytesWritable key, Iterator<ValueWritable> values,
+            OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
         byte groupByAdjList = 0;
         int count = 0;
         byte bytCount = 0;
@@ -43,8 +44,8 @@
             groupByAdjList = (byte) (groupByAdjList | values.next().getFirst());
             count = count + 1;
         }
-        if (count >= 128)
-            bytCount = (byte) 128;
+        if (count >= 127)
+            bytCount = (byte) 127;
         else
             bytCount = (byte) count;
         vaWriter.set(groupByAdjList, bytCount);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
index 67c4e09..ad62ee9 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
@@ -18,6 +18,7 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -65,12 +66,12 @@
         conf.setReducerClass(GenomixReducer.class);
         conf.setCombinerClass(GenomixCombiner.class);
 
-        conf.setMapOutputKeyClass(VLongWritable.class);
+        conf.setMapOutputKeyClass(BytesWritable.class);
         conf.setMapOutputValueClass(ValueWritable.class);
 
         conf.setInputFormat(TextInputFormat.class);
         conf.setOutputFormat(TextOutputFormat.class);
-        conf.setOutputKeyClass(VLongWritable.class);
+        conf.setOutputKeyClass(BytesWritable.class);
         conf.setOutputValueClass(ValueWritable.class);
         FileInputFormat.setInputPaths(conf, new Path(inputPath));
         FileOutputFormat.setOutputPath(conf, new Path(outputPath));
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index 367370e..3eb5e12 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -19,6 +19,7 @@
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -32,17 +33,53 @@
 /**
  * This class implement mapper operator of mapreduce model
  */
-public class GenomixMapper extends MapReduceBase implements Mapper<LongWritable, Text, VLongWritable, ValueWritable> {
+public class GenomixMapper extends MapReduceBase implements Mapper<LongWritable, Text, BytesWritable, ValueWritable> {
+
+    public class CurrenByte {
+        public byte curByte;
+        public byte preMarker;
+    }
 
     public static int KMER_SIZE;
     public ValueWritable outputAdjList = new ValueWritable();
-    public VLongWritable outputKmer = new VLongWritable();
+    public BytesWritable outputKmer = new BytesWritable();
 
     @Override
     public void configure(JobConf job) {
         KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
     }
 
+    public CurrenByte shift(byte curByte, byte newKmer) {
+        CurrenByte currentByte = new CurrenByte();
+        byte preMarker = (byte) 0xC0;
+        preMarker = (byte) (preMarker & curByte);
+        curByte = (byte) (curByte << 2);
+        curByte = (byte) (curByte | newKmer);
+        preMarker = (byte) ((preMarker & 0xff) >> 6);
+        currentByte.curByte = curByte;
+        currentByte.preMarker = preMarker;
+        return currentByte;
+    }
+
+    public CurrenByte lastByteShift(byte curByte, byte newKmer, int kmerSize) {
+        CurrenByte currentByte = new CurrenByte();
+        int restBits = (kmerSize * 2) % 8;
+        if (restBits == 0)
+            restBits = 8;
+        byte preMarker = (byte) 0x03;
+        preMarker = (byte) (preMarker << restBits - 2);
+        preMarker = (byte) (preMarker & curByte);
+        preMarker = (byte) ((preMarker & 0xff) >> restBits - 2);
+        byte reset = 3;
+        reset = (byte) ~(reset << restBits - 2);
+        curByte = (byte) (curByte & reset);
+        curByte = (byte) (curByte << 2);
+        curByte = (byte) (curByte | newKmer);
+        currentByte.curByte = curByte;
+        currentByte.preMarker = preMarker;
+        return currentByte;
+    }
+
     /*succeed node
       A 00000001 1
       G 00000010 2
@@ -54,7 +91,7 @@
       C 01000000 64
       T 10000000 128*/
     @Override
-    public void map(LongWritable key, Text value, OutputCollector<VLongWritable, ValueWritable> output,
+    public void map(LongWritable key, Text value, OutputCollector<BytesWritable, ValueWritable> output,
             Reporter reporter) throws IOException {
         /* A 00
            G 01
@@ -66,70 +103,134 @@
         boolean isValid = geneMatcher.matches();
         int i = 0;
         if (isValid == true) {
-            long kmerValue = 0;
-            long PreMarker = -1;
+            byte[] kmerValue = new byte[KMER_SIZE * 2 / 8 + 1];
+            for (int k = 0; k < kmerValue.length; k++)
+                kmerValue[i] = 0x00;
+            CurrenByte currentByte = new CurrenByte();
+            byte preMarker = (byte) -1;
             byte count = 0;
-            //Get the next kmer by shiftint one letter every time
+            //Get the next kmer by shifting one letter every time
             for (i = 0; i < geneLine.length(); i++) {
                 byte kmerAdjList = 0;
+                byte initial;
                 if (i >= KMER_SIZE) {
-                    outputKmer.set(kmerValue);
-                    switch ((int) PreMarker) {
+                    outputKmer.set(kmerValue, 0, KMER_SIZE * 2 / 8 + 1);
+                    switch ((int) preMarker) {
                         case -1:
                             kmerAdjList = (byte) (kmerAdjList + 0);
                             break;
                         case 0:
                             kmerAdjList = (byte) (kmerAdjList + 16);
                             break;
-                        case 16:
+                        case 1:
                             kmerAdjList = (byte) (kmerAdjList + 32);
                             break;
-                        case 32:
+                        case 2:
                             kmerAdjList = (byte) (kmerAdjList + 64);
                             break;
-                        case 48:
+                        case 3:
                             kmerAdjList = (byte) (kmerAdjList + 128);
                             break;
                     }
-                    //Update the premarker
-                    PreMarker = 3;
-                    PreMarker = PreMarker << (KMER_SIZE - 1) * 2;
-                    PreMarker = PreMarker & kmerValue;
-                    //Reset the top two bits
-                    long reset = 3;
-                    kmerValue = kmerValue << 2;
-                    reset = ~(reset << KMER_SIZE * 2);
-                    kmerValue = kmerValue & reset;
                 }
                 switch (geneLine.charAt(i)) {
                     case 'A':
                         kmerAdjList = (byte) (kmerAdjList + 1);
-                        kmerValue = kmerValue + 0;
+
+                        initial = (byte) 0x00;
+                        if (kmerValue.length == 1) {
+                            currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[kmerValue.length - 1] = currentByte.curByte;
+                        } else {
+                            currentByte = shift(kmerValue[0], initial);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[0] = currentByte.curByte;
+                            for (int j = 1; j < kmerValue.length - 1; j++) {
+                                currentByte = shift(kmerValue[j], preMarker);
+                                preMarker = currentByte.preMarker;
+                                kmerValue[j] = currentByte.curByte;
+                            }
+                            currentByte = lastByteShift(kmerValue[kmerValue.length - 1], preMarker, KMER_SIZE);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[kmerValue.length - 1] = currentByte.curByte;
+                        }
+
                         break;
                     case 'G':
                         kmerAdjList = (byte) (kmerAdjList + 2);
-                        kmerValue = kmerValue + 1;
+
+                        initial = (byte) 0x01;
+                        if (kmerValue.length == 1) {
+                            currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[kmerValue.length - 1] = currentByte.curByte;
+                        } else {
+                            currentByte = shift(kmerValue[0], initial);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[0] = currentByte.curByte;
+                            for (int j = 1; j < kmerValue.length - 1; j++) {
+                                currentByte = shift(kmerValue[j], preMarker);
+                                preMarker = currentByte.preMarker;
+                                kmerValue[j] = currentByte.curByte;
+                            }
+                            currentByte = lastByteShift(kmerValue[kmerValue.length - 1], preMarker, KMER_SIZE);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[kmerValue.length - 1] = currentByte.curByte;
+                        }
                         break;
                     case 'C':
                         kmerAdjList = (byte) (kmerAdjList + 4);
-                        kmerValue = kmerValue + 2;
+
+                        initial = (byte) 0x02;
+                        if (kmerValue.length == 1) {
+                            currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[kmerValue.length - 1] = currentByte.curByte;
+                        } else {
+                            currentByte = shift(kmerValue[0], initial);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[0] = currentByte.curByte;
+                            for (int j = 1; j < kmerValue.length - 1; j++) {
+                                currentByte = shift(kmerValue[j], preMarker);
+                                preMarker = currentByte.preMarker;
+                                kmerValue[j] = currentByte.curByte;
+                            }
+                            currentByte = lastByteShift(kmerValue[kmerValue.length - 1], preMarker, KMER_SIZE);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[kmerValue.length - 1] = currentByte.curByte;
+                        }
                         break;
                     case 'T':
                         kmerAdjList = (byte) (kmerAdjList + 8);
-                        kmerValue = kmerValue + 3;
+                        initial = (byte) 0x03;
+                        if (kmerValue.length == 1) {
+                            currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[kmerValue.length - 1] = currentByte.curByte;
+                        } else {
+                            currentByte = shift(kmerValue[0], initial);
+                            preMarker = currentByte.preMarker;
+                            kmerValue[0] = currentByte.curByte;
+                            for (int j = 1; j < kmerValue.length - 1; j++) {
+                                currentByte = shift(kmerValue[j], preMarker);
+                                preMarker = currentByte.preMarker;
+                                kmerValue[j] = currentByte.curByte;
+                            }
+                        }
                         break;
                 }
                 if (i >= KMER_SIZE) {
                     outputAdjList.set(kmerAdjList, count);
                     output.collect(outputKmer, outputAdjList);
                 }
-                if (i < KMER_SIZE - 1)
-                    kmerValue = (kmerValue << 2);
+                if (i < KMER_SIZE)
+                    preMarker = (byte) -1;
             }
             // arrive the last letter of this gene line
             if (i == geneLine.length()) {
                 byte kmerAdjList = 0;
-                switch ((int) PreMarker) {
+                switch ((int) preMarker) {
                     case 0:
                         kmerAdjList = (byte) (kmerAdjList + 16);
                         break;
@@ -144,7 +245,7 @@
                         break;
                 }
                 outputAdjList.set(kmerAdjList, count);
-                outputKmer.set(kmerValue);
+                outputKmer.set(kmerValue, 0, KMER_SIZE * 2 / 8 + 1);
                 output.collect(outputKmer, outputAdjList);
             }
         }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
index 7358ae0..ebf0df0 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
@@ -1,4 +1,5 @@
 package edu.uci.ics.graphbuilding;
+
 /*
  * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,6 +17,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.VLongWritable;
@@ -23,29 +25,31 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+
 /**
  * This class implement reducer operator of mapreduce model
  */
 public class GenomixReducer extends MapReduceBase implements
-Reducer<VLongWritable, ValueWritable, VLongWritable, ValueWritable> {
-ValueWritable valWriter = new ValueWritable();
-@Override
-public void reduce(VLongWritable key, Iterator<ValueWritable> values,
-    OutputCollector<VLongWritable, ValueWritable> output, Reporter reporter) throws IOException {
-byte groupByAdjList = 0;
-int count = 0;
-byte bytCount = 0;
-while (values.hasNext()) {
-    //Merge By the all adjacent Nodes;
-    ValueWritable geneValue = values.next();
-    groupByAdjList = (byte) (groupByAdjList | geneValue.getFirst());
-    count = count + (int)geneValue.getSecond();
-}
-if(count >= 128)
-    bytCount = (byte)128;
-else
-    bytCount = (byte)count;
-valWriter.set(groupByAdjList, bytCount);
-output.collect(key, valWriter);
-}
+        Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
+    ValueWritable valWriter = new ValueWritable();
+
+    @Override
+    public void reduce(BytesWritable key, Iterator<ValueWritable> values,
+            OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
+        byte groupByAdjList = 0;
+        int count = 0;
+        byte bytCount = 0;
+        while (values.hasNext()) {
+            //Merge By the all adjacent Nodes;
+            ValueWritable geneValue = values.next();
+            groupByAdjList = (byte) (groupByAdjList | geneValue.getFirst());
+            count = count + (int) geneValue.getSecond();
+        }
+        if (count >= 127)
+            bytCount = (byte) 127;
+        else
+            bytCount = (byte) count;
+        valWriter.set(groupByAdjList, bytCount);
+        output.collect(key, valWriter);
+    }
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java
index 87d9be6..2989ae6 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java
@@ -1,4 +1,5 @@
 package edu.uci.ics.graphbuilding;
+
 /*
  * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,6 +19,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableComparable;
+
 /**
  * This class override the writablecomparable class which contain int varable
  */
@@ -50,15 +52,18 @@
         out.writeByte(first);
         out.writeByte(second);
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
         first = in.readByte();
         second = in.readByte();
     }
+
     @Override
     public int hashCode() {
         return (int) first + (int) second;
     }
+
     @Override
     public boolean equals(Object o) {
         if (o instanceof ValueWritable) {
@@ -67,10 +72,12 @@
         }
         return false;
     }
+
     @Override
     public String toString() {
         return Integer.toString(first) + "\t" + Integer.toString(second);
     }
+
     @Override
     public int compareTo(ValueWritable tp) {
         int cmp;
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 0095fe9..d04cbdb 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -1,4 +1,5 @@
 package edu.uci.ics.graphbuilding;
+
 /*
  * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,6 +29,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.utils.TestUtils;
+
 /**
  * This class test the correctness of graphbuilding program
  */