add the byteswritable
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2880 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hadoop/actual/result2/.part-00000.crc b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
index 7a34e25..8871afc 100755
--- a/genomix/genomix-hadoop/actual/result2/.part-00000.crc
+++ b/genomix/genomix-hadoop/actual/result2/.part-00000.crc
Binary files differ
diff --git a/genomix/genomix-hadoop/actual/result2/part-00000 b/genomix/genomix-hadoop/actual/result2/part-00000
index 1f33880..9d64c60 100755
--- a/genomix/genomix-hadoop/actual/result2/part-00000
+++ b/genomix/genomix-hadoop/actual/result2/part-00000
@@ -1,7 +1,7 @@
-1 33 1
-3 1 1
-4 -103 2
-12 18 1
-16 18 1
-19 16 1
-49 17 1
+01 33 1
+03 1 1
+04 -103 2
+0c 18 1
+10 18 1
+13 16 1
+31 17 1
diff --git a/genomix/genomix-hadoop/expected/result2 b/genomix/genomix-hadoop/expected/result2
index bc7bc64..aa56fbf 100755
--- a/genomix/genomix-hadoop/expected/result2
+++ b/genomix/genomix-hadoop/expected/result2
@@ -1,7 +1,7 @@
-1 33 1
-3 1 1
-4 -103 2
-12 18 1
-16 18 1
-19 16 1
-49 17 1
+01 33 1
+03 1 1
+04 -103 2
+0c 18 1
+10 18 1
+13 16 1
+31 17 1
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
index dadbb9a..74ef455 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixCombiner.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.VLongWritable;
@@ -29,12 +30,12 @@
* This class implement the combiner operator of Mapreduce model
*/
public class GenomixCombiner extends MapReduceBase implements
- Reducer<VLongWritable, ValueWritable, VLongWritable, ValueWritable> {
+ Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
public ValueWritable vaWriter = new ValueWritable();
@Override
- public void reduce(VLongWritable key, Iterator<ValueWritable> values,
- OutputCollector<VLongWritable, ValueWritable> output, Reporter reporter) throws IOException {
+ public void reduce(BytesWritable key, Iterator<ValueWritable> values,
+ OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
byte groupByAdjList = 0;
int count = 0;
byte bytCount = 0;
@@ -43,8 +44,8 @@
groupByAdjList = (byte) (groupByAdjList | values.next().getFirst());
count = count + 1;
}
- if (count >= 128)
- bytCount = (byte) 128;
+ if (count >= 127)
+ bytCount = (byte) 127;
else
bytCount = (byte) count;
vaWriter.set(groupByAdjList, bytCount);
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
index 67c4e09..ad62ee9 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixDriver.java
@@ -18,6 +18,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -65,12 +66,12 @@
conf.setReducerClass(GenomixReducer.class);
conf.setCombinerClass(GenomixCombiner.class);
- conf.setMapOutputKeyClass(VLongWritable.class);
+ conf.setMapOutputKeyClass(BytesWritable.class);
conf.setMapOutputValueClass(ValueWritable.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
- conf.setOutputKeyClass(VLongWritable.class);
+ conf.setOutputKeyClass(BytesWritable.class);
conf.setOutputValueClass(ValueWritable.class);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index 367370e..3eb5e12 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -19,6 +19,7 @@
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -32,17 +33,53 @@
/**
* This class implement mapper operator of mapreduce model
*/
-public class GenomixMapper extends MapReduceBase implements Mapper<LongWritable, Text, VLongWritable, ValueWritable> {
+public class GenomixMapper extends MapReduceBase implements Mapper<LongWritable, Text, BytesWritable, ValueWritable> {
+
+ public class CurrenByte {
+ public byte curByte;
+ public byte preMarker;
+ }
public static int KMER_SIZE;
public ValueWritable outputAdjList = new ValueWritable();
- public VLongWritable outputKmer = new VLongWritable();
+ public BytesWritable outputKmer = new BytesWritable();
@Override
public void configure(JobConf job) {
KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
}
+ public CurrenByte shift(byte curByte, byte newKmer) {
+ CurrenByte currentByte = new CurrenByte();
+ byte preMarker = (byte) 0xC0;
+ preMarker = (byte) (preMarker & curByte);
+ curByte = (byte) (curByte << 2);
+ curByte = (byte) (curByte | newKmer);
+ preMarker = (byte) ((preMarker & 0xff) >> 6);
+ currentByte.curByte = curByte;
+ currentByte.preMarker = preMarker;
+ return currentByte;
+ }
+
+ public CurrenByte lastByteShift(byte curByte, byte newKmer, int kmerSize) {
+ CurrenByte currentByte = new CurrenByte();
+ int restBits = (kmerSize * 2) % 8;
+ if (restBits == 0)
+ restBits = 8;
+ byte preMarker = (byte) 0x03;
+ preMarker = (byte) (preMarker << restBits - 2);
+ preMarker = (byte) (preMarker & curByte);
+ preMarker = (byte) ((preMarker & 0xff) >> restBits - 2);
+ byte reset = 3;
+ reset = (byte) ~(reset << restBits - 2);
+ curByte = (byte) (curByte & reset);
+ curByte = (byte) (curByte << 2);
+ curByte = (byte) (curByte | newKmer);
+ currentByte.curByte = curByte;
+ currentByte.preMarker = preMarker;
+ return currentByte;
+ }
+
/*succeed node
A 00000001 1
G 00000010 2
@@ -54,7 +91,7 @@
C 01000000 64
T 10000000 128*/
@Override
- public void map(LongWritable key, Text value, OutputCollector<VLongWritable, ValueWritable> output,
+ public void map(LongWritable key, Text value, OutputCollector<BytesWritable, ValueWritable> output,
Reporter reporter) throws IOException {
/* A 00
G 01
@@ -66,70 +103,134 @@
boolean isValid = geneMatcher.matches();
int i = 0;
if (isValid == true) {
- long kmerValue = 0;
- long PreMarker = -1;
+ byte[] kmerValue = new byte[KMER_SIZE * 2 / 8 + 1];
+ for (int k = 0; k < kmerValue.length; k++)
+ kmerValue[i] = 0x00;
+ CurrenByte currentByte = new CurrenByte();
+ byte preMarker = (byte) -1;
byte count = 0;
- //Get the next kmer by shiftint one letter every time
+ //Get the next kmer by shifting one letter every time
for (i = 0; i < geneLine.length(); i++) {
byte kmerAdjList = 0;
+ byte initial;
if (i >= KMER_SIZE) {
- outputKmer.set(kmerValue);
- switch ((int) PreMarker) {
+ outputKmer.set(kmerValue, 0, KMER_SIZE * 2 / 8 + 1);
+ switch ((int) preMarker) {
case -1:
kmerAdjList = (byte) (kmerAdjList + 0);
break;
case 0:
kmerAdjList = (byte) (kmerAdjList + 16);
break;
- case 16:
+ case 1:
kmerAdjList = (byte) (kmerAdjList + 32);
break;
- case 32:
+ case 2:
kmerAdjList = (byte) (kmerAdjList + 64);
break;
- case 48:
+ case 3:
kmerAdjList = (byte) (kmerAdjList + 128);
break;
}
- //Update the premarker
- PreMarker = 3;
- PreMarker = PreMarker << (KMER_SIZE - 1) * 2;
- PreMarker = PreMarker & kmerValue;
- //Reset the top two bits
- long reset = 3;
- kmerValue = kmerValue << 2;
- reset = ~(reset << KMER_SIZE * 2);
- kmerValue = kmerValue & reset;
}
switch (geneLine.charAt(i)) {
case 'A':
kmerAdjList = (byte) (kmerAdjList + 1);
- kmerValue = kmerValue + 0;
+
+ initial = (byte) 0x00;
+ if (kmerValue.length == 1) {
+ currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+ preMarker = currentByte.preMarker;
+ kmerValue[kmerValue.length - 1] = currentByte.curByte;
+ } else {
+ currentByte = shift(kmerValue[0], initial);
+ preMarker = currentByte.preMarker;
+ kmerValue[0] = currentByte.curByte;
+ for (int j = 1; j < kmerValue.length - 1; j++) {
+ currentByte = shift(kmerValue[j], preMarker);
+ preMarker = currentByte.preMarker;
+ kmerValue[j] = currentByte.curByte;
+ }
+ currentByte = lastByteShift(kmerValue[kmerValue.length - 1], preMarker, KMER_SIZE);
+ preMarker = currentByte.preMarker;
+ kmerValue[kmerValue.length - 1] = currentByte.curByte;
+ }
+
break;
case 'G':
kmerAdjList = (byte) (kmerAdjList + 2);
- kmerValue = kmerValue + 1;
+
+ initial = (byte) 0x01;
+ if (kmerValue.length == 1) {
+ currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+ preMarker = currentByte.preMarker;
+ kmerValue[kmerValue.length - 1] = currentByte.curByte;
+ } else {
+ currentByte = shift(kmerValue[0], initial);
+ preMarker = currentByte.preMarker;
+ kmerValue[0] = currentByte.curByte;
+ for (int j = 1; j < kmerValue.length - 1; j++) {
+ currentByte = shift(kmerValue[j], preMarker);
+ preMarker = currentByte.preMarker;
+ kmerValue[j] = currentByte.curByte;
+ }
+ currentByte = lastByteShift(kmerValue[kmerValue.length - 1], preMarker, KMER_SIZE);
+ preMarker = currentByte.preMarker;
+ kmerValue[kmerValue.length - 1] = currentByte.curByte;
+ }
break;
case 'C':
kmerAdjList = (byte) (kmerAdjList + 4);
- kmerValue = kmerValue + 2;
+
+ initial = (byte) 0x02;
+ if (kmerValue.length == 1) {
+ currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+ preMarker = currentByte.preMarker;
+ kmerValue[kmerValue.length - 1] = currentByte.curByte;
+ } else {
+ currentByte = shift(kmerValue[0], initial);
+ preMarker = currentByte.preMarker;
+ kmerValue[0] = currentByte.curByte;
+ for (int j = 1; j < kmerValue.length - 1; j++) {
+ currentByte = shift(kmerValue[j], preMarker);
+ preMarker = currentByte.preMarker;
+ kmerValue[j] = currentByte.curByte;
+ }
+ currentByte = lastByteShift(kmerValue[kmerValue.length - 1], preMarker, KMER_SIZE);
+ preMarker = currentByte.preMarker;
+ kmerValue[kmerValue.length - 1] = currentByte.curByte;
+ }
break;
case 'T':
kmerAdjList = (byte) (kmerAdjList + 8);
- kmerValue = kmerValue + 3;
+ initial = (byte) 0x03;
+ if (kmerValue.length == 1) {
+ currentByte = lastByteShift(kmerValue[kmerValue.length - 1], initial, KMER_SIZE);
+ preMarker = currentByte.preMarker;
+ kmerValue[kmerValue.length - 1] = currentByte.curByte;
+ } else {
+ currentByte = shift(kmerValue[0], initial);
+ preMarker = currentByte.preMarker;
+ kmerValue[0] = currentByte.curByte;
+ for (int j = 1; j < kmerValue.length - 1; j++) {
+ currentByte = shift(kmerValue[j], preMarker);
+ preMarker = currentByte.preMarker;
+ kmerValue[j] = currentByte.curByte;
+ }
+ }
break;
}
if (i >= KMER_SIZE) {
outputAdjList.set(kmerAdjList, count);
output.collect(outputKmer, outputAdjList);
}
- if (i < KMER_SIZE - 1)
- kmerValue = (kmerValue << 2);
+ if (i < KMER_SIZE)
+ preMarker = (byte) -1;
}
// arrive the last letter of this gene line
if (i == geneLine.length()) {
byte kmerAdjList = 0;
- switch ((int) PreMarker) {
+ switch ((int) preMarker) {
case 0:
kmerAdjList = (byte) (kmerAdjList + 16);
break;
@@ -144,7 +245,7 @@
break;
}
outputAdjList.set(kmerAdjList, count);
- outputKmer.set(kmerValue);
+ outputKmer.set(kmerValue, 0, KMER_SIZE * 2 / 8 + 1);
output.collect(outputKmer, outputAdjList);
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
index 7358ae0..ebf0df0 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixReducer.java
@@ -1,4 +1,5 @@
package edu.uci.ics.graphbuilding;
+
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,6 +17,7 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.VLongWritable;
@@ -23,29 +25,31 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+
/**
* This class implement reducer operator of mapreduce model
*/
public class GenomixReducer extends MapReduceBase implements
-Reducer<VLongWritable, ValueWritable, VLongWritable, ValueWritable> {
-ValueWritable valWriter = new ValueWritable();
-@Override
-public void reduce(VLongWritable key, Iterator<ValueWritable> values,
- OutputCollector<VLongWritable, ValueWritable> output, Reporter reporter) throws IOException {
-byte groupByAdjList = 0;
-int count = 0;
-byte bytCount = 0;
-while (values.hasNext()) {
- //Merge By the all adjacent Nodes;
- ValueWritable geneValue = values.next();
- groupByAdjList = (byte) (groupByAdjList | geneValue.getFirst());
- count = count + (int)geneValue.getSecond();
-}
-if(count >= 128)
- bytCount = (byte)128;
-else
- bytCount = (byte)count;
-valWriter.set(groupByAdjList, bytCount);
-output.collect(key, valWriter);
-}
+ Reducer<BytesWritable, ValueWritable, BytesWritable, ValueWritable> {
+ ValueWritable valWriter = new ValueWritable();
+
+ @Override
+ public void reduce(BytesWritable key, Iterator<ValueWritable> values,
+ OutputCollector<BytesWritable, ValueWritable> output, Reporter reporter) throws IOException {
+ byte groupByAdjList = 0;
+ int count = 0;
+ byte bytCount = 0;
+ while (values.hasNext()) {
+ //Merge By the all adjacent Nodes;
+ ValueWritable geneValue = values.next();
+ groupByAdjList = (byte) (groupByAdjList | geneValue.getFirst());
+ count = count + (int) geneValue.getSecond();
+ }
+ if (count >= 127)
+ bytCount = (byte) 127;
+ else
+ bytCount = (byte) count;
+ valWriter.set(groupByAdjList, bytCount);
+ output.collect(key, valWriter);
+ }
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java
index 87d9be6..2989ae6 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/ValueWritable.java
@@ -1,4 +1,5 @@
package edu.uci.ics.graphbuilding;
+
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,6 +19,7 @@
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
+
/**
* This class override the writablecomparable class which contain int varable
*/
@@ -50,15 +52,18 @@
out.writeByte(first);
out.writeByte(second);
}
+
@Override
public void readFields(DataInput in) throws IOException {
first = in.readByte();
second = in.readByte();
}
+
@Override
public int hashCode() {
return (int) first + (int) second;
}
+
@Override
public boolean equals(Object o) {
if (o instanceof ValueWritable) {
@@ -67,10 +72,12 @@
}
return false;
}
+
@Override
public String toString() {
return Integer.toString(first) + "\t" + Integer.toString(second);
}
+
@Override
public int compareTo(ValueWritable tp) {
int cmp;
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 0095fe9..d04cbdb 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -1,4 +1,5 @@
package edu.uci.ics.graphbuilding;
+
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,6 +29,7 @@
import org.junit.Test;
import edu.uci.ics.utils.TestUtils;
+
/**
* This class test the correctness of graphbuilding program
*/