Merge branch 'genomix/velvet_graphbuilding' into genomix/fullstack_genomix
Conflicts:
genomix/genomix-data/.classpath
genomix/genomix-data/.project
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
diff --git a/genomix/genomix-data/.classpath b/genomix/genomix-data/.classpath
deleted file mode 100644
index daa9d4b..0000000
--- a/genomix/genomix-data/.classpath
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
- <classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"/>
- <classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
- <classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
- <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"/>
- <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
- <classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/genomix/genomix-data/.project b/genomix/genomix-data/.project
deleted file mode 100644
index 4628890..0000000
--- a/genomix/genomix-data/.project
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
- <name>genomix-data</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.maven.ide.eclipse.maven2Builder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.eclipse.m2e.core.maven2Builder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
- <nature>org.eclipse.jdt.core.javanature</nature>
- <nature>org.eclipse.m2e.core.maven2Nature</nature>
- </natures>
-</projectDescription>
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
similarity index 93%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
index 68dec30..a030f0b 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/KmerUtil.java
@@ -13,7 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.type;
+package edu.uci.ics.genomix.data;
+
+import edu.uci.ics.genomix.type.GeneCode;
public class KmerUtil {
public static final String empty = "";
@@ -34,7 +36,7 @@
public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
StringBuilder strKmer = new StringBuilder();
int byteId = keyStart + keyLength - 1;
- if (byteId < 0) {
+ if (byteId < 0 || k < 1) {
return empty;
}
byte currentbyte = keyData[byteId];
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
new file mode 100644
index 0000000..219def6
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/data/Marshal.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.genomix.data;
+
+public class Marshal {
+ public static int getInt(byte[] bytes, int offset) {
+ return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+ + ((bytes[offset + 3] & 0xff) << 0);
+ }
+
+ public static void putInt(int val, byte[] bytes, int offset) {
+ bytes[offset] = (byte)((val >>> 24) & 0xFF);
+ bytes[offset + 1] = (byte)((val >>> 16) & 0xFF);
+ bytes[offset + 2] = (byte)((val >>> 8) & 0xFF);
+ bytes[offset + 3] = (byte)((val >>> 0) & 0xFF);
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 5be5f83..03e2fd9 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -50,96 +50,10 @@
}
public static byte getSymbolFromCode(byte code) {
- if (code > 3) {
- return '!';
+ if (code > 3 || code < 0) {
+ throw new IllegalArgumentException("Not such gene code");
}
return GENE_SYMBOL[code];
}
- public static byte getAdjBit(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 1 << A;
- break;
- case 'C':
- case 'c':
- r = 1 << C;
- break;
- case 'G':
- case 'g':
- r = 1 << G;
- break;
- case 'T':
- case 't':
- r = 1 << T;
- break;
- }
- return r;
- }
-
- /**
- * It works for path merge. Merge the kmer by his next, we need to make sure
- * the @{t} is a single neighbor.
- *
- * @param t
- * the neighbor code in BitMap
- * @return the genecode
- */
- public static byte getGeneCodeFromBitMap(byte t) {
- switch (t) {
- case 1 << A:
- return A;
- case 1 << C:
- return C;
- case 1 << G:
- return G;
- case 1 << T:
- return T;
- }
- return -1;
- }
-
- public static byte getBitMapFromGeneCode(byte t) {
- return (byte) (1 << t);
- }
-
- public static int countNumberOfBitSet(int i) {
- int c = 0;
- for (; i != 0; c++) {
- i &= i - 1;
- }
- return c;
- }
-
- public static int inDegree(byte bitmap) {
- return countNumberOfBitSet((bitmap >> 4) & 0x0f);
- }
-
- public static int outDegree(byte bitmap) {
- return countNumberOfBitSet(bitmap & 0x0f);
- }
-
- public static byte mergePreNextAdj(byte pre, byte next) {
- return (byte) (pre << 4 | (next & 0x0f));
- }
-
- public static String getSymbolFromBitMap(byte code) {
- int left = (code >> 4) & 0x0F;
- int right = code & 0x0F;
- StringBuilder str = new StringBuilder();
- for (int i = A; i <= T; i++) {
- if ((left & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- str.append('|');
- for (int i = A; i <= T; i++) {
- if ((right & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- return str.toString();
- }
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index fd4c252..8f9094f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -24,8 +24,10 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import edu.uci.ics.genomix.data.KmerUtil;
+
/**
- * Fix kmer length byteswritable
+ * Variable kmer length byteswritable
* It was used to generate the graph in which phase the kmer length doesn't change.
* Thus the size of bytes doesn't change either.
*/
@@ -38,25 +40,15 @@
protected int size;
protected byte[] bytes;
+ protected int offset;
protected int kmerlength;
- @Deprecated
public KmerBytesWritable() {
- this(0, EMPTY_BYTES);
+ this(0, EMPTY_BYTES, 0);
}
- public KmerBytesWritable(int k, byte[] storage) {
- this.kmerlength = k;
- if (k > 0) {
- this.size = KmerUtil.getByteNumFromK(kmerlength);
- this.bytes = storage;
- if (this.bytes.length < size) {
- throw new ArrayIndexOutOfBoundsException("Storage is smaller than required space for kmerlength:k");
- }
- } else {
- this.bytes = storage;
- this.size = 0;
- }
+ public KmerBytesWritable(int k, byte[] storage, int offset) {
+ setNewReference(k, storage, offset);
}
/**
@@ -73,28 +65,92 @@
} else {
this.bytes = EMPTY_BYTES;
}
+ this.offset = 0;
}
public KmerBytesWritable(KmerBytesWritable right) {
- if (right != null) {
- this.kmerlength = right.kmerlength;
- this.size = right.size;
- this.bytes = new byte[right.size];
- set(right);
- }else{
- this.kmerlength = 0;
- this.size = 0;
- this.bytes = EMPTY_BYTES;
+ this(right.kmerlength);
+ set(right);
+ }
+
+ public void set(KmerBytesWritable newData) {
+ if (newData == null) {
+ this.set(0, EMPTY_BYTES, 0);
+ } else {
+ this.set(newData.kmerlength, newData.bytes, 0);
+ }
+ }
+
+ public void set(byte[] newData, int offset) {
+ if (kmerlength > 0) {
+ System.arraycopy(newData, offset, bytes, offset, size);
+ }
+ }
+
+ public void set(int k, byte[] newData, int offset) {
+ reset(k);
+ if (k > 0) {
+ System.arraycopy(newData, offset, bytes, this.offset, size);
+ }
+ }
+
+ /**
+ * Reset array by kmerlength
+ *
+ * @param k
+ */
+ public void reset(int k) {
+ this.kmerlength = k;
+ setSize(KmerUtil.getByteNumFromK(k));
+ clearLeadBit();
+ }
+
+ public void setNewReference(byte[] newData, int offset) {
+ this.bytes = newData;
+ this.offset = offset;
+ if (newData.length - offset < size) {
+ throw new IllegalArgumentException("Not given enough space");
+ }
+ }
+
+ public void setNewReference(int k, byte[] newData, int offset) {
+ this.kmerlength = k;
+ this.size = KmerUtil.getByteNumFromK(k);
+ setNewReference(newData, offset);
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ this.size = size;
+ }
+
+ protected int getCapacity() {
+ return bytes.length;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap != getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (new_cap < size) {
+ size = new_cap;
+ }
+ if (size != 0) {
+ System.arraycopy(bytes, offset, new_data, 0, size);
+ }
+ bytes = new_data;
+ offset = 0;
}
}
public byte getGeneCodeAtPosition(int pos) {
if (pos >= kmerlength) {
- return -1;
+ throw new IllegalArgumentException("gene position out of bound");
}
int posByte = pos / 4;
int shift = (pos % 4) << 1;
- return (byte) ((bytes[size - 1 - posByte] >> shift) & 0x3);
+ return (byte) ((bytes[offset + size - 1 - posByte] >> shift) & 0x3);
}
public int getKmerLength() {
@@ -106,6 +162,10 @@
return bytes;
}
+ public int getOffset() {
+ return offset;
+ }
+
@Override
public int getLength() {
return size;
@@ -128,16 +188,21 @@
l |= (byte) (code << bytecount);
bytecount += 2;
if (bytecount == 8) {
- bytes[bcount--] = l;
+ bytes[offset + bcount--] = l;
l = 0;
bytecount = 0;
}
}
if (bcount >= 0) {
- bytes[0] = l;
+ bytes[offset] = l;
}
}
+ public void setByRead(int k, byte[] array, int start) {
+ reset(k);
+ setByRead(array, start);
+ }
+
/**
* Compress Reversed Kmer into bytes array AATAG will compress as
* [0x000A,0xATAG]
@@ -156,16 +221,21 @@
l |= (byte) (code << bytecount);
bytecount += 2;
if (bytecount == 8) {
- bytes[bcount--] = l;
+ bytes[offset + bcount--] = l;
l = 0;
bytecount = 0;
}
}
if (bcount >= 0) {
- bytes[0] = l;
+ bytes[offset] = l;
}
}
+ public void setByReadReverse(int k, byte[] array, int start) {
+ reset(k);
+ setByReadReverse(array, start);
+ }
+
/**
* Shift Kmer to accept new char input
*
@@ -185,14 +255,14 @@
* @return the shift out gene, in gene code format
*/
public byte shiftKmerWithNextCode(byte c) {
- byte output = (byte) (bytes[size - 1] & 0x03);
+ byte output = (byte) (bytes[offset + size - 1] & 0x03);
for (int i = size - 1; i > 0; i--) {
- byte in = (byte) (bytes[i - 1] & 0x03);
- bytes[i] = (byte) (((bytes[i] >>> 2) & 0x3f) | (in << 6));
+ byte in = (byte) (bytes[offset + i - 1] & 0x03);
+ bytes[offset + i] = (byte) (((bytes[offset + i] >>> 2) & 0x3f) | (in << 6));
}
int pos = ((kmerlength - 1) % 4) << 1;
byte code = (byte) (c << pos);
- bytes[0] = (byte) (((bytes[0] >>> 2) & 0x3f) | code);
+ bytes[offset] = (byte) (((bytes[offset] >>> 2) & 0x3f) | code);
clearLeadBit();
return output;
}
@@ -217,34 +287,44 @@
*/
public byte shiftKmerWithPreCode(byte c) {
int pos = ((kmerlength - 1) % 4) << 1;
- byte output = (byte) ((bytes[0] >> pos) & 0x03);
+ byte output = (byte) ((bytes[offset] >> pos) & 0x03);
for (int i = 0; i < size - 1; i++) {
- byte in = (byte) ((bytes[i + 1] >> 6) & 0x03);
- bytes[i] = (byte) ((bytes[i] << 2) | in);
+ byte in = (byte) ((bytes[offset + i + 1] >> 6) & 0x03);
+ bytes[offset + i] = (byte) ((bytes[offset + i] << 2) | in);
}
- bytes[size - 1] = (byte) ((bytes[size - 1] << 2) | c);
+ bytes[offset + size - 1] = (byte) ((bytes[offset + size - 1] << 2) | c);
clearLeadBit();
return output;
}
+ /**
+ * Merge kmer with next neighbor in gene-code format.
+ * The k of new kmer will increase by 1
+ * e.g. AAGCT merge with A => AAGCTA
+ *
+ * @param k
+ * :input k of kmer
+ * @param nextCode
+ * : next neighbor in gene-code format
+ * @return the merged Kmer, this K of this Kmer is k+1
+ */
+ public void mergeKmerWithNextCode(byte nextCode) {
+ this.kmerlength += 1;
+ setSize(KmerUtil.getByteNumFromK(kmerlength));
+ if (kmerlength % 4 == 1) {
+ for (int i = getLength() - 1; i > 0; i--) {
+ bytes[offset + i] = bytes[offset + i - 1];
+ }
+ bytes[offset] = (byte) (nextCode & 0x3);
+ } else {
+ bytes[offset] = (byte) (bytes[offset] | ((nextCode & 0x3) << (((kmerlength-1) % 4) << 1)));
+ }
+ clearLeadBit();
+ }
+
protected void clearLeadBit() {
if (kmerlength % 4 != 0) {
- bytes[0] &= (1 << ((kmerlength % 4) << 1)) - 1;
- }
- }
-
- public void set(KmerBytesWritable newData) {
- if (kmerlength != newData.kmerlength){
- throw new IllegalArgumentException("kmerSize is different, try to use VKmerBytesWritable instead");
- }
- if (kmerlength > 0 ){
- set(newData.bytes, 0, newData.size);
- }
- }
-
- public void set(byte[] newData, int offset, int length) {
- if (kmerlength > 0){
- System.arraycopy(newData, offset, bytes, 0, size);
+ bytes[offset] &= (1 << ((kmerlength % 4) << 1)) - 1;
}
}
@@ -259,8 +339,9 @@
if (this.kmerlength > 0) {
if (this.bytes.length < this.size) {
this.bytes = new byte[this.size];
+ this.offset = 0;
}
- in.readFully(bytes, 0, size);
+ in.readFully(bytes, offset, size);
}
}
@@ -268,7 +349,7 @@
public void write(DataOutput out) throws IOException {
out.writeInt(kmerlength);
if (kmerlength > 0) {
- out.write(bytes, 0, size);
+ out.write(bytes, offset, size);
}
}
@@ -286,7 +367,7 @@
@Override
public String toString() {
- return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), 0, this.getLength());
+ return KmerUtil.recoverKmerFrom(this.kmerlength, this.getBytes(), offset, this.getLength());
}
public static class Comparator extends WritableComparator {
@@ -309,5 +390,4 @@
static { // register this comparator
WritableComparator.define(KmerBytesWritable.class, new Comparator());
}
-
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
similarity index 88%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
index c00967f..9d458d2 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritableFactory.java
@@ -15,11 +15,11 @@
package edu.uci.ics.genomix.type;
-public class VKmerBytesWritableFactory {
- private VKmerBytesWritable kmer;
+public class KmerBytesWritableFactory {
+ private KmerBytesWritable kmer;
- public VKmerBytesWritableFactory(int k) {
- kmer = new VKmerBytesWritable(k);
+ public KmerBytesWritableFactory(int k) {
+ kmer = new KmerBytesWritable(k);
}
/**
@@ -30,8 +30,9 @@
* @param array
* @param start
*/
- public VKmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
- kmer.setByRead(k, array, start);
+ public KmerBytesWritable getKmerByRead(int k, byte[] array, int start) {
+ kmer.reset(k);
+ kmer.setByRead(array, start);
return kmer;
}
@@ -42,8 +43,9 @@
* @param array
* @param start
*/
- public VKmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
- kmer.setByReadReverse(k, array, start);
+ public KmerBytesWritable getKmerByReadReverse(int k, byte[] array, int start) {
+ kmer.reset(k);
+ kmer.setByReadReverse(array, start);
return kmer;
}
@@ -57,7 +59,7 @@
* @param kmerChain
* @return LastKmer bytes array
*/
- public VKmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
+ public KmerBytesWritable getLastKmerFromChain(int lastK, final KmerBytesWritable kmerChain) {
if (lastK > kmerChain.getKmerLength()) {
return null;
}
@@ -93,7 +95,7 @@
* @param kmerChain
* @return FirstKmer bytes array
*/
- public VKmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
+ public KmerBytesWritable getFirstKmerFromChain(int firstK, final KmerBytesWritable kmerChain) {
if (firstK > kmerChain.getKmerLength()) {
return null;
}
@@ -117,7 +119,7 @@
return kmer;
}
- public VKmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
+ public KmerBytesWritable getSubKmerFromChain(int startK, int kSize, final KmerBytesWritable kmerChain) {
if (startK + kSize > kmerChain.getKmerLength()) {
return null;
}
@@ -157,7 +159,7 @@
* : next neighbor in gene-code format
* @return the merged Kmer, this K of this Kmer is k+1
*/
- public VKmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
+ public KmerBytesWritable mergeKmerWithNextCode(final KmerBytesWritable kmer, byte nextCode) {
this.kmer.reset(kmer.getKmerLength() + 1);
for (int i = 1; i <= kmer.getLength(); i++) {
this.kmer.getBytes()[this.kmer.getLength() - i] = kmer.getBytes()[kmer.getLength() - i];
@@ -184,7 +186,7 @@
* : next neighbor in gene-code format
* @return the merged Kmer,this K of this Kmer is k+1
*/
- public VKmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+ public KmerBytesWritable mergeKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
this.kmer.reset(kmer.getKmerLength() + 1);
int byteInMergedKmer = 0;
if (kmer.getKmerLength() % 4 == 0) {
@@ -213,7 +215,7 @@
* : bytes array of next kmer
* @return merged kmer, the new k is @preK + @nextK
*/
- public VKmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
+ public KmerBytesWritable mergeTwoKmer(final KmerBytesWritable preKmer, final KmerBytesWritable nextKmer) {
kmer.reset(preKmer.getKmerLength() + nextKmer.getKmerLength());
int i = 1;
for (; i <= preKmer.getLength(); i++) {
@@ -253,7 +255,7 @@
* : input genecode
* @return new created kmer that shifted by afterCode, the K will not change
*/
- public VKmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
+ public KmerBytesWritable shiftKmerWithNextCode(final KmerBytesWritable kmer, byte afterCode) {
this.kmer.set(kmer);
this.kmer.shiftKmerWithNextCode(afterCode);
return this.kmer;
@@ -271,7 +273,7 @@
* : input genecode
* @return new created kmer that shifted by preCode, the K will not change
*/
- public VKmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
+ public KmerBytesWritable shiftKmerWithPreCode(final KmerBytesWritable kmer, byte preCode) {
this.kmer.set(kmer);
this.kmer.shiftKmerWithPreCode(preCode);
return this.kmer;
@@ -282,7 +284,7 @@
*
* @param kmer
*/
- public VKmerBytesWritable reverse(final KmerBytesWritable kmer) {
+ public KmerBytesWritable reverse(final KmerBytesWritable kmer) {
this.kmer.reset(kmer.getKmerLength());
int curPosAtKmer = ((kmer.getKmerLength() - 1) % 4) << 1;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
deleted file mode 100644
index fab7001..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class KmerCountValue implements Writable {
- private byte adjBitMap;
- private byte count;
-
- public KmerCountValue(byte bitmap, byte count) {
- set(bitmap, count);
- }
-
- public KmerCountValue() {
- adjBitMap = 0;
- count = 0;
- }
-
- @Override
- public void readFields(DataInput arg0) throws IOException {
- adjBitMap = arg0.readByte();
- count = arg0.readByte();
- }
-
- @Override
- public void write(DataOutput arg0) throws IOException {
- arg0.writeByte(adjBitMap);
- arg0.writeByte(count);
- }
-
- @Override
- public String toString() {
- return GeneCode.getSymbolFromBitMap(adjBitMap) + '\t' + String.valueOf(count);
- }
-
- public void set(byte bitmap, byte count) {
- this.adjBitMap = bitmap;
- this.count = count;
- }
-
- public byte getAdjBitMap() {
- return adjBitMap;
- }
-
- public void setAdjBitMap(byte adjBitMap) {
- this.adjBitMap = adjBitMap;
- }
-
- public byte getCount() {
- return count;
- }
-}
\ No newline at end of file
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
new file mode 100644
index 0000000..332b23b
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -0,0 +1,133 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class NodeWritable implements WritableComparable<NodeWritable> {
+ private PositionWritable nodeID;
+ private int countOfKmer;
+ private PositionListWritable incomingList;
+ private PositionListWritable outgoingList;
+ private KmerBytesWritable kmer;
+
+ public NodeWritable(){
+ nodeID = new PositionWritable();
+ countOfKmer = 0;
+ incomingList = new PositionListWritable();
+ outgoingList = new PositionListWritable();
+ kmer = new KmerBytesWritable();
+ }
+
+ public NodeWritable(int kmerSize) {
+ nodeID = new PositionWritable();
+ countOfKmer = 0;
+ incomingList = new PositionListWritable();
+ outgoingList = new PositionListWritable();
+ kmer = new KmerBytesWritable(kmerSize);
+ }
+
+ public int getCount() {
+ return countOfKmer;
+ }
+
+ public void setCount(int count) {
+ this.countOfKmer = count;
+ }
+
+ public void setNodeID(PositionWritable ref) {
+ this.setNodeID(ref.getReadID(), ref.getPosInRead());
+ }
+
+ public void setNodeID(int readID, byte posInRead) {
+ nodeID.set(readID, posInRead);
+ }
+
+ public void setIncomingList(PositionListWritable incoming) {
+ incomingList.set(incoming);
+ }
+
+ public void setOutgoingList(PositionListWritable outgoing) {
+ outgoingList.set(outgoing);
+ }
+
+ public void reset() {
+ nodeID.set(0, (byte) 0);
+ incomingList.reset();
+ outgoingList.reset();
+ countOfKmer = 0;
+ }
+
+ public PositionListWritable getIncomingList() {
+ return incomingList;
+ }
+
+ public PositionListWritable getOutgoingList() {
+ return outgoingList;
+ }
+
+ public PositionWritable getNodeID() {
+ return nodeID;
+ }
+
+ public KmerBytesWritable getKmer() {
+ return kmer;
+ }
+
+ public void mergeNextWithinOneRead(NodeWritable nextNodeEntry) {
+ this.countOfKmer += 1;
+ this.outgoingList.set(nextNodeEntry.outgoingList);
+ kmer.mergeKmerWithNextCode(nextNodeEntry.kmer.getGeneCodeAtPosition(nextNodeEntry.kmer.getKmerLength() - 1));
+ }
+
+ public void set(NodeWritable node) {
+ this.nodeID.set(node.getNodeID().getReadID(), node.getNodeID().getPosInRead());
+ this.countOfKmer = node.countOfKmer;
+ this.incomingList.set(node.getIncomingList());
+ this.outgoingList.set(node.getOutgoingList());
+ this.kmer.set(node.kmer);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.nodeID.readFields(in);
+ this.countOfKmer = in.readInt();
+ this.incomingList.readFields(in);
+ this.outgoingList.readFields(in);
+ this.kmer.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ this.nodeID.write(out);
+ out.writeInt(this.countOfKmer);
+ this.incomingList.write(out);
+ this.outgoingList.write(out);
+ this.kmer.write(out);
+ }
+
+ @Override
+ public int compareTo(NodeWritable other) {
+ return this.nodeID.compareTo(other.nodeID);
+ }
+
+ @Override
+ public int hashCode() {
+ return nodeID.hashCode();
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('(');
+ sbuilder.append(nodeID.toString()).append(',');
+ sbuilder.append(countOfKmer).append(',');
+ sbuilder.append(incomingList.toString()).append(',');
+ sbuilder.append(incomingList.toString()).append(',');
+ sbuilder.append(kmer.toString()).append(')');
+ return sbuilder.toString();
+ }
+
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
new file mode 100644
index 0000000..a3b5c84
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -0,0 +1,159 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionListWritable implements Writable, Iterable<PositionWritable> {
+ protected byte[] storage;
+ protected int offset;
+ protected int valueCount;
+ protected static final byte[] EMPTY = {};
+
+ protected PositionWritable posIter = new PositionWritable();
+
+ public PositionListWritable() {
+ this.storage = EMPTY;
+ this.valueCount = 0;
+ this.offset = 0;
+ }
+
+ public PositionListWritable(int count, byte[] data, int offset) {
+ setNewReference(count, data, offset);
+ }
+
+ public void setNewReference(int count, byte[] data, int offset) {
+ this.valueCount = count;
+ this.storage = data;
+ this.offset = offset;
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ }
+
+ protected int getCapacity() {
+ return storage.length - offset;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap > getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (storage.length - offset > 0) {
+ System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
+ }
+ storage = new_data;
+ offset = 0;
+ }
+ }
+
+ public PositionWritable getPosition(int i) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("No such positions");
+ }
+ posIter.setNewReference(storage, offset + i * PositionWritable.LENGTH);
+ return posIter;
+ }
+
+ @Override
+ public Iterator<PositionWritable> iterator() {
+ Iterator<PositionWritable> it = new Iterator<PositionWritable>() {
+
+ private int currentIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < valueCount;
+ }
+
+ @Override
+ public PositionWritable next() {
+ return getPosition(currentIndex++);
+ }
+
+ @Override
+ public void remove() {
+ // TODO Auto-generated method stub
+ }
+ };
+ return it;
+ }
+
+ public void set(PositionListWritable list2) {
+ set(list2.valueCount, list2.storage, list2.offset);
+ }
+
+ public void set(int valueCount, byte[] newData, int offset) {
+ this.valueCount = valueCount;
+ setSize(valueCount * PositionWritable.LENGTH);
+ if (valueCount > 0) {
+ System.arraycopy(newData, offset, storage, this.offset, valueCount * PositionWritable.LENGTH);
+ }
+ }
+
+ public void reset() {
+ valueCount = 0;
+ }
+
+ public void append(PositionWritable pos) {
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+ * PositionWritable.LENGTH, pos.getLength());
+ valueCount += 1;
+ }
+
+ public void append(int readID, byte posInRead) {
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ Marshal.putInt(readID, storage, offset + valueCount * PositionWritable.LENGTH);
+ storage[offset + valueCount * PositionWritable.LENGTH + PositionWritable.INTBYTES] = posInRead;
+ valueCount += 1;
+ }
+
+ public int getCountOfPosition() {
+ return valueCount;
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return valueCount * PositionWritable.LENGTH;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.valueCount = in.readInt();
+ setSize(valueCount * PositionWritable.LENGTH);
+ in.readFully(storage, offset, valueCount * PositionWritable.LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(valueCount);
+ out.write(storage, offset, valueCount * PositionWritable.LENGTH);
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('[');
+ for(PositionWritable pos : this){
+ sbuilder.append(pos.toString());
+ sbuilder.append(',');
+ }
+ sbuilder.setCharAt(sbuilder.length()-1, ']');
+ return sbuilder.toString();
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
new file mode 100644
index 0000000..c7f24ec
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -0,0 +1,119 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.genomix.data.Marshal;
+
+public class PositionWritable implements WritableComparable<PositionWritable> {
+ protected byte[] storage;
+ protected int offset;
+ public static final int LENGTH = 5;
+ public static final int INTBYTES = 4;
+
+ public PositionWritable() {
+ storage = new byte[LENGTH];
+ offset = 0;
+ }
+
+ public PositionWritable(int readID, byte posInRead) {
+ this();
+ set(readID, posInRead);
+ }
+
+ public PositionWritable(byte[] storage, int offset) {
+ setNewReference(storage, offset);
+ }
+
+ public void setNewReference(byte[] storage, int offset) {
+ this.storage = storage;
+ this.offset = offset;
+ }
+
+ public void set(int readID, byte posInRead) {
+ Marshal.putInt(readID, storage, offset);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
+ public int getReadID() {
+ return Marshal.getInt(storage, offset);
+ }
+
+ public byte getPosInRead() {
+ return storage[offset + INTBYTES];
+ }
+
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ public int getStartOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return LENGTH;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ in.readFully(storage, offset, LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(storage, offset, LENGTH);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getReadID();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PositionWritable))
+ return false;
+ PositionWritable other = (PositionWritable) o;
+ return this.getReadID() == other.getReadID() && this.getPosInRead() == other.getPosInRead();
+ }
+
+ @Override
+ public int compareTo(PositionWritable other) {
+ int diff = this.getReadID() - other.getReadID();
+ if (diff == 0) {
+ return this.getPosInRead() - other.getPosInRead();
+ }
+ return diff;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Integer.toString(getReadID()) + "," + Integer.toString((int) getPosInRead()) + ")";
+ }
+
+ /** A Comparator optimized for IntWritable. */
+ public static class Comparator extends WritableComparator {
+ public Comparator() {
+ super(PositionWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int thisValue = Marshal.getInt(b1, s1);
+ int thatValue = Marshal.getInt(b2, s2);
+ int diff = thisValue - thatValue;
+ if (diff == 0) {
+ return b1[s1 + INTBYTES] - b2[s2 + INTBYTES];
+ }
+ return diff;
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(PositionWritable.class, new Comparator());
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
deleted file mode 100644
index abedad6..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.type;
-
-public class VKmerBytesWritable extends KmerBytesWritable {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Deprecated
- public VKmerBytesWritable() {
- super();
- }
-
- public VKmerBytesWritable(int k, byte[] storage) {
- super(k, storage);
- }
-
- public VKmerBytesWritable(int k) {
- super(k);
- }
-
- public VKmerBytesWritable(KmerBytesWritable other) {
- super(other);
- }
-
- protected void setSize(int size) {
- if (size > getCapacity()) {
- setCapacity((size * 3 / 2));
- }
- this.size = size;
- }
-
- protected int getCapacity() {
- return bytes.length;
- }
-
- protected void setCapacity(int new_cap) {
- if (new_cap != getCapacity()) {
- byte[] new_data = new byte[new_cap];
- if (new_cap < size) {
- size = new_cap;
- }
- if (size != 0) {
- System.arraycopy(bytes, 0, new_data, 0, size);
- }
- bytes = new_data;
- }
- }
-
- /**
- * Read Kmer from read text into bytes array e.g. AATAG will compress as
- * [0x000G, 0xATAA]
- *
- * @param k
- * @param array
- * @param start
- */
- public void setByRead(int k, byte[] array, int start) {
- reset(k);
- super.setByRead(array, start);
- }
-
- /**
- * Compress Reversed Kmer into bytes array AATAG will compress as
- * [0x000A,0xATAG]
- *
- * @param input
- * array
- * @param start
- * position
- */
- public void setByReadReverse(int k, byte[] array, int start) {
- reset(k);
- super.setByReadReverse(array, start);
- }
-
- @Override
- public void set(KmerBytesWritable newData) {
- if (newData == null){
- this.set(0,null,0,0);
- }else{
- this.set(newData.kmerlength, newData.bytes, 0, newData.size);
- }
- }
-
- public void set(int k, byte[] newData, int offset, int length) {
- reset(k);
- if (k > 0 ){
- System.arraycopy(newData, offset, bytes, 0, size);
- }
- }
-
- /**
- * Reset array by kmerlength
- *
- * @param k
- */
- public void reset(int k) {
- this.kmerlength = k;
- setSize(0);
- setSize(KmerUtil.getByteNumFromK(k));
- clearLeadBit();
- }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
deleted file mode 100644
index 7aa05f5..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.type.old;
-
-@Deprecated
-public class Kmer {
-
- public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
-
- public final static class GENE_CODE {
-
- /**
- * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
- */
- public static final byte A = 0;
- public static final byte C = 1;
- public static final byte G = 2;
- public static final byte T = 3;
-
- public static byte getCodeFromSymbol(byte ch) {
- byte r = 0;
- switch (ch) {
- case 'A':
- case 'a':
- r = A;
- break;
- case 'C':
- case 'c':
- r = C;
- break;
- case 'G':
- case 'g':
- r = G;
- break;
- case 'T':
- case 't':
- r = T;
- break;
- }
- return r;
- }
-
- public static byte getSymbolFromCode(byte code) {
- if (code > 3) {
- return '!';
- }
- return GENE_SYMBOL[code];
- }
-
- public static byte getAdjBit(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 1 << A;
- break;
- case 'C':
- case 'c':
- r = 1 << C;
- break;
- case 'G':
- case 'g':
- r = 1 << G;
- break;
- case 'T':
- case 't':
- r = 1 << T;
- break;
- }
- return r;
- }
-
- /**
- * It works for path merge.
- * Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
- *
- * @param t
- * the neighbor code in BitMap
- * @return the genecode
- */
- public static byte getGeneCodeFromBitMap(byte t) {
- switch (t) {
- case 1 << A:
- return A;
- case 1 << C:
- return C;
- case 1 << G:
- return G;
- case 1 << T:
- return T;
- }
- return -1;
- }
-
- public static byte mergePreNextAdj(byte pre, byte next) {
- return (byte) (pre << 4 | (next & 0x0f));
- }
-
- public static String getSymbolFromBitMap(byte code) {
- int left = (code >> 4) & 0x0F;
- int right = code & 0x0F;
- StringBuilder str = new StringBuilder();
- for (int i = A; i <= T; i++) {
- if ((left & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- str.append('|');
- for (int i = A; i <= T; i++) {
- if ((right & (1 << i)) != 0) {
- str.append((char) GENE_SYMBOL[i]);
- }
- }
- return str.toString();
- }
- }
-
- public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
- StringBuilder strKmer = new StringBuilder();
- int byteId = keyStart + keyLength - 1;
- byte currentbyte = keyData[byteId];
- for (int geneCount = 0; geneCount < k; geneCount++) {
- if (geneCount % 4 == 0 && geneCount > 0) {
- currentbyte = keyData[--byteId];
- }
- strKmer.append((char) GENE_SYMBOL[(currentbyte >> ((geneCount % 4) * 2)) & 0x03]);
- }
- return strKmer.toString();
- }
-
- public static int getByteNumFromK(int k) {
- int x = k / 4;
- if (k % 4 != 0) {
- x += 1;
- }
- return x;
- }
-
- /**
- * Compress Kmer into bytes array AATAG will compress as [0x000G, 0xATAA]
- *
- * @param kmer
- * @param input
- * array
- * @param start
- * position
- * @return initialed kmer array
- */
- public static byte[] compressKmer(int k, byte[] array, int start) {
- final int byteNum = getByteNumFromK(k);
- byte[] bytes = new byte[byteNum];
-
- byte l = 0;
- int bytecount = 0;
- int bcount = byteNum - 1;
- for (int i = start; i < start + k; i++) {
- byte code = GENE_CODE.getCodeFromSymbol(array[i]);
- l |= (byte) (code << bytecount);
- bytecount += 2;
- if (bytecount == 8) {
- bytes[bcount--] = l;
- l = 0;
- bytecount = 0;
- }
- }
- if (bcount >= 0) {
- bytes[0] = l;
- }
- return bytes;
- }
-
- /**
- * Shift Kmer to accept new input
- *
- * @param kmer
- * @param bytes
- * Kmer Array
- * @param c
- * Input new gene character
- * @return the shiftout gene, in gene code format
- */
- public static byte moveKmer(int k, byte[] kmer, byte c) {
- int byteNum = kmer.length;
- byte output = (byte) (kmer[byteNum - 1] & 0x03);
- for (int i = byteNum - 1; i > 0; i--) {
- byte in = (byte) (kmer[i - 1] & 0x03);
- kmer[i] = (byte) (((kmer[i] >>> 2) & 0x3f) | (in << 6));
- }
- int pos = ((k - 1) % 4) << 1;
- byte code = (byte) (GENE_CODE.getCodeFromSymbol(c) << pos);
- kmer[0] = (byte) (((kmer[0] >>> 2) & 0x3f) | code);
- return (byte) (1 << output);
- }
-
- public static byte reverseKmerByte(byte k) {
- int x = (((k >> 2) & 0x33) | ((k << 2) & 0xcc));
- return (byte) (((x >> 4) & 0x0f) | ((x << 4) & 0xf0));
- }
-
- public static byte[] reverseKmer(int k, byte[] kmer) {
- byte[] reverseKmer = new byte[kmer.length];
-
- int curPosAtKmer = ((k - 1) % 4) << 1;
- int curByteAtKmer = 0;
-
- int curPosAtReverse = 0;
- int curByteAtReverse = reverseKmer.length - 1;
- reverseKmer[curByteAtReverse] = 0;
- for (int i = 0; i < k; i++) {
- byte gene = (byte) ((kmer[curByteAtKmer] >> curPosAtKmer) & 0x03);
- reverseKmer[curByteAtReverse] |= gene << curPosAtReverse;
- curPosAtReverse += 2;
- if (curPosAtReverse >= 8) {
- curPosAtReverse = 0;
- reverseKmer[--curByteAtReverse] = 0;
- }
- curPosAtKmer -= 2;
- if (curPosAtKmer < 0) {
- curPosAtKmer = 6;
- curByteAtKmer++;
- }
- }
-
- return reverseKmer;
- }
-
- /**
- * Compress Reversed Kmer into bytes array AATAG will compress as
- * [0x000A,0xATAG]
- *
- * @param kmer
- * @param input
- * array
- * @param start
- * position
- * @return initialed kmer array
- */
- public static byte[] compressKmerReverse(int k, byte[] array, int start) {
- final int byteNum = getByteNumFromK(k);
- byte[] bytes = new byte[byteNum];
-
- byte l = 0;
- int bytecount = 0;
- int bcount = byteNum - 1;
- for (int i = start + k - 1; i >= 0; i--) {
- byte code = GENE_CODE.getCodeFromSymbol(array[i]);
- l |= (byte) (code << bytecount);
- bytecount += 2;
- if (bytecount == 8) {
- bytes[bcount--] = l;
- l = 0;
- bytecount = 0;
- }
- }
- if (bcount >= 0) {
- bytes[0] = l;
- }
- return bytes;
- }
-
- /**
- * Shift Kmer to accept new input
- *
- * @param kmer
- * @param bytes
- * Kmer Array
- * @param c
- * Input new gene character
- * @return the shiftout gene, in gene code format
- */
- public static byte moveKmerReverse(int k, byte[] kmer, byte c) {
- int pos = ((k - 1) % 4) << 1;
- byte output = (byte) ((kmer[0] >> pos) & 0x03);
- for (int i = 0; i < kmer.length - 1; i++) {
- byte in = (byte) ((kmer[i + 1] >> 6) & 0x03);
- kmer[i] = (byte) ((kmer[i] << 2) | in);
- }
- // (k%4) * 2
- if (k % 4 != 0) {
- kmer[0] &= (1 << ((k % 4) << 1)) - 1;
- }
- kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
- return (byte) (1 << output);
- }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
deleted file mode 100644
index 7a96d2f..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-@Deprecated
-public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
- private static final int LENGTH_BYTES = 4;
- private static final byte[] EMPTY_BYTES = {};
- private byte size;
- private byte[] bytes;
-
- public KmerBytesWritable() {
- this(EMPTY_BYTES);
- }
-
- public KmerBytesWritable(byte[] bytes) {
- this.bytes = bytes;
- this.size = (byte) bytes.length;
- }
-
- @Override
- public byte[] getBytes() {
- return bytes;
- }
-
- @Deprecated
- public byte[] get() {
- return getBytes();
- }
-
- @Override
- public int getLength() {
- return (int) size;
- }
-
- @Deprecated
- public int getSize() {
- return getLength();
- }
-
- public void setSize(byte size) {
- if ((int) size > getCapacity()) {
- setCapacity((byte) (size * 3 / 2));
- }
- this.size = size;
- }
-
- public int getCapacity() {
- return bytes.length;
- }
-
- public void setCapacity(byte new_cap) {
- if (new_cap != getCapacity()) {
- byte[] new_data = new byte[new_cap];
- if (new_cap < size) {
- size = new_cap;
- }
- if (size != 0) {
- System.arraycopy(bytes, 0, new_data, 0, size);
- }
- bytes = new_data;
- }
- }
-
- public void set(KmerBytesWritable newData) {
- set(newData.bytes, (byte) 0, newData.size);
- }
-
- public void set(byte[] newData, byte offset, byte length) {
- setSize((byte) 0);
- setSize(length);
- System.arraycopy(newData, offset, bytes, 0, size);
- }
-
- public void readFields(DataInput in) throws IOException {
- setSize((byte) 0); // clear the old data
- setSize(in.readByte());
- in.readFully(bytes, 0, size);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(size);
- out.write(bytes, 0, size);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override
- public boolean equals(Object right_obj) {
- if (right_obj instanceof KmerBytesWritable)
- return super.equals(right_obj);
- return false;
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer(3 * size);
- for (int idx = 0; idx < (int) size; idx++) {
- // if not the first, put a blank separator in
- if (idx != 0) {
- sb.append(' ');
- }
- String num = Integer.toHexString(0xff & bytes[idx]);
- // if it is only one digit, add a leading 0.
- if (num.length() < 2) {
- sb.append('0');
- }
- sb.append(num);
- }
- return sb.toString();
- }
-
- public static class Comparator extends WritableComparator {
- public Comparator() {
- super(KmerBytesWritable.class);
- }
-
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
- }
- }
-
- static { // register this comparator
- WritableComparator.define(KmerBytesWritable.class, new Comparator());
- }
-
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
deleted file mode 100644
index a4b1bec..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package edu.uci.ics.genomix.type.old;
-
-import java.util.Arrays;
-
-@Deprecated
-public class KmerUtil {
-
- public static int countNumberOfBitSet(int i) {
- int c = 0;
- for (; i != 0; c++) {
- i &= i - 1;
- }
- return c;
- }
-
- public static int inDegree(byte bitmap) {
- return countNumberOfBitSet((bitmap >> 4) & 0x0f);
- }
-
- public static int outDegree(byte bitmap) {
- return countNumberOfBitSet(bitmap & 0x0f);
- }
-
- /**
- * Get last kmer from kmer-chain.
- * e.g. kmerChain is AAGCTA, if k =5, it will
- * return AGCTA
- *
- * @param k
- * @param kInChain
- * @param kmerChain
- * @return LastKmer bytes array
- */
- public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
- if (k > kInChain) {
- return null;
- }
- if (k == kInChain) {
- return kmerChain.clone();
- }
- int byteNum = Kmer.getByteNumFromK(k);
- byte[] kmer = new byte[byteNum];
-
- /** from end to start */
- int byteInChain = length - 1 - (kInChain - k) / 4;
- int posInByteOfChain = ((kInChain - k) % 4) << 1; // *2
- int byteInKmer = byteNum - 1;
- for (; byteInKmer >= 0 && byteInChain > 0; byteInKmer--, byteInChain--) {
- kmer[byteInKmer] = (byte) ((0xff & kmerChain[offset + byteInChain]) >> posInByteOfChain);
- kmer[byteInKmer] |= ((kmerChain[offset + byteInChain - 1] << (8 - posInByteOfChain)));
- }
-
- /** last kmer byte */
- if (byteInKmer == 0) {
- kmer[0] = (byte) ((kmerChain[offset] & 0xff) >> posInByteOfChain);
- }
- return kmer;
- }
-
- /**
- * Get first kmer from kmer-chain e.g. kmerChain is AAGCTA, if k=5, it will
- * return AAGCT
- *
- * @param k
- * @param kInChain
- * @param kmerChain
- * @return FirstKmer bytes array
- */
- public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
- if (k > kInChain) {
- return null;
- }
- if (k == kInChain) {
- return kmerChain.clone();
- }
- int byteNum = Kmer.getByteNumFromK(k);
- byte[] kmer = new byte[byteNum];
-
- int i = 1;
- for (; i < kmer.length; i++) {
- kmer[kmer.length - i] = kmerChain[offset + length - i];
- }
- int posInByteOfChain = (k % 4) << 1; // *2
- if (posInByteOfChain == 0) {
- kmer[0] = kmerChain[offset + length - i];
- } else {
- kmer[0] = (byte) (kmerChain[offset + length - i] & ((1 << posInByteOfChain) - 1));
- }
- return kmer;
- }
-
- /**
- * Merge kmer with next neighbor in gene-code format.
- * The k of new kmer will increase by 1
- * e.g. AAGCT merge with A => AAGCTA
- *
- * @param k
- * :input k of kmer
- * @param kmer
- * : input bytes of kmer
- * @param nextCode
- * : next neighbor in gene-code format
- * @return the merged Kmer, this K of this Kmer is k+1
- */
- public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
- int byteNum = length;
- if (k % 4 == 0) {
- byteNum++;
- }
- byte[] mergedKmer = new byte[byteNum];
- for (int i = 1; i <= length; i++) {
- mergedKmer[mergedKmer.length - i] = kmer[offset + length - i];
- }
- if (mergedKmer.length > length) {
- mergedKmer[0] = (byte) (nextCode & 0x3);
- } else {
- mergedKmer[0] = (byte) (kmer[offset] | ((nextCode & 0x3) << ((k % 4) << 1)));
- }
- return mergedKmer;
- }
-
- /**
- * Merge kmer with previous neighbor in gene-code format.
- * The k of new kmer will increase by 1
- * e.g. AAGCT merge with A => AAAGCT
- *
- * @param k
- * :input k of kmer
- * @param kmer
- * : input bytes of kmer
- * @param preCode
- * : next neighbor in gene-code format
- * @return the merged Kmer,this K of this Kmer is k+1
- */
- public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
- int byteNum = length;
- byte[] mergedKmer = null;
- int byteInMergedKmer = 0;
- if (k % 4 == 0) {
- byteNum++;
- mergedKmer = new byte[byteNum];
- mergedKmer[0] = (byte) ((kmer[offset] >> 6) & 0x3);
- byteInMergedKmer++;
- } else {
- mergedKmer = new byte[byteNum];
- }
- for (int i = 0; i < length - 1; i++, byteInMergedKmer++) {
- mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + i] << 2) | ((kmer[offset + i + 1] >> 6) & 0x3));
- }
- mergedKmer[byteInMergedKmer] = (byte) ((kmer[offset + length - 1] << 2) | (preCode & 0x3));
- return mergedKmer;
- }
-
- /**
- * Merge two kmer to one kmer
- * e.g. ACTA + ACCGT => ACTAACCGT
- *
- * @param preK
- * : previous k of kmer
- * @param kmerPre
- * : bytes array of previous kmer
- * @param nextK
- * : next k of kmer
- * @param kmerNext
- * : bytes array of next kmer
- * @return merged kmer, the new k is @preK + @nextK
- */
- public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
- byte[] kmerNext, int offsetNext, int lengthNext) {
- int byteNum = Kmer.getByteNumFromK(preK + nextK);
- byte[] mergedKmer = new byte[byteNum];
- int i = 1;
- for (; i <= lengthPre; i++) {
- mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
- }
- if (i > 1) {
- i--;
- }
- if (preK % 4 == 0) {
- for (int j = 1; j <= lengthNext; j++) {
- mergedKmer[byteNum - i - j] = kmerNext[offsetNext + lengthNext - j];
- }
- } else {
- int posNeedToMove = ((preK % 4) << 1);
- mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
- for (int j = 1; j < lengthNext; j++) {
- mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
- + lengthNext - j - 1] << posNeedToMove));
- }
- if ((nextK % 4) * 2 + posNeedToMove > 8) {
- mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
- }
- }
- return mergedKmer;
- }
-
- /**
- * Safely shifted the kmer forward without change the input kmer
- * e.g. AGCGC shift with T => GCGCT
- *
- * @param k
- * : kmer length
- * @param kmer
- * : input kmer
- * @param afterCode
- * : input genecode
- * @return new created kmer that shifted by afterCode, the K will not change
- */
- public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
- Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
- return shifted;
- }
-
- /**
- * Safely shifted the kmer backward without change the input kmer
- * e.g. AGCGC shift with T => TAGCG
- *
- * @param k
- * : kmer length
- * @param kmer
- * : input kmer
- * @param preCode
- * : input genecode
- * @return new created kmer that shifted by preCode, the K will not change
- */
- public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
- Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
- return shifted;
- }
-
- public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
- if (pos >= k) {
- return -1;
- }
- int posByte = pos / 4;
- int shift = (pos % 4) << 1;
- return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
- }
-}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
similarity index 87%
rename from genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
rename to genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
index 7c4c675..1eb58c8 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableFactoryTest.java
@@ -13,29 +13,19 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.example.kmer;
+package edu.uci.ics.genomix.data.test;
import org.junit.Assert;
import org.junit.Test;
import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
-public class VKmerBytesWritableFactoryTest {
+public class KmerBytesWritableFactoryTest {
static byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G', 'T' };
- VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(8);
-
- @Test
- public void TestDegree() {
- Assert.assertTrue(GeneCode.inDegree((byte) 0xff) == 4);
- Assert.assertTrue(GeneCode.outDegree((byte) 0xff) == 4);
- Assert.assertTrue(GeneCode.inDegree((byte) 0x3f) == 2);
- Assert.assertTrue(GeneCode.outDegree((byte) 0x01) == 1);
- Assert.assertTrue(GeneCode.inDegree((byte) 0x01) == 0);
- }
+ KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(8);
@Test
public void TestGetLastKmer() {
@@ -49,7 +39,7 @@
lastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
}
- VKmerBytesWritable vlastKmer;
+ KmerBytesWritable vlastKmer;
for (int i = 8; i > 0; i--) {
vlastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
@@ -70,7 +60,7 @@
firstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
}
- VKmerBytesWritable vfirstKmer;
+ KmerBytesWritable vfirstKmer;
for (int i = 8; i > 0; i--) {
vfirstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
@@ -84,7 +74,7 @@
KmerBytesWritable kmer = new KmerBytesWritable(9);
kmer.setByRead(array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
- VKmerBytesWritable subKmer;
+ KmerBytesWritable subKmer;
for (int istart = 0; istart < kmer.getKmerLength() - 1; istart++) {
for (int isize = 1; isize + istart <= kmer.getKmerLength(); isize++) {
subKmer = kmerFactory.getSubKmerFromChain(istart, isize, kmer);
@@ -168,7 +158,7 @@
KmerBytesWritable kmer5 = new KmerBytesWritable(7);
kmer5.setByRead(array, 0);
String text5 = "AGCTGAC";
- VKmerBytesWritable kmer6 = new VKmerBytesWritable(9);
+ KmerBytesWritable kmer6 = new KmerBytesWritable(9);
kmer6.setByRead(9, array, 1);
String text6 = "GCTGACCGT";
merged = kmerFactory.mergeTwoKmer(kmer5, kmer6);
@@ -188,14 +178,14 @@
@Test
public void TestShift() {
- VKmerBytesWritable kmer = new VKmerBytesWritable(kmerFactory.getKmerByRead(9, array, 0));
+ KmerBytesWritable kmer = new KmerBytesWritable(kmerFactory.getKmerByRead(9, array, 0));
String text = "AGCTGACCG";
Assert.assertEquals(text, kmer.toString());
- VKmerBytesWritable kmerForward = kmerFactory.shiftKmerWithNextCode(kmer, GeneCode.A);
+ KmerBytesWritable kmerForward = kmerFactory.shiftKmerWithNextCode(kmer, GeneCode.A);
Assert.assertEquals(text, kmer.toString());
Assert.assertEquals("GCTGACCGA", kmerForward.toString());
- VKmerBytesWritable kmerBackward = kmerFactory.shiftKmerWithPreCode(kmer, GeneCode.C);
+ KmerBytesWritable kmerBackward = kmerFactory.shiftKmerWithPreCode(kmer, GeneCode.C);
Assert.assertEquals(text, kmer.toString());
Assert.assertEquals("CAGCTGACC", kmerBackward.toString());
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
similarity index 82%
rename from genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
rename to genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index faee509..b7dd8f1 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.example.kmer;
+package edu.uci.ics.genomix.data.test;
import junit.framework.Assert;
@@ -90,4 +90,21 @@
}
}
+ @Test
+ public void TestMergeNext() {
+ KmerBytesWritable kmer = new KmerBytesWritable(9);
+ byte[] array = { 'A', 'G', 'C', 'T', 'G', 'A', 'C', 'C', 'G' };
+ kmer.setByRead(array, 0);
+ Assert.assertEquals("AGCTGACCG", kmer.toString());
+
+ String text = "AGCTGACCG";
+ for (int i = 0; i < 10; i++) {
+ for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+ kmer.mergeKmerWithNextCode(x);
+ text = text + (char) GeneCode.GENE_SYMBOL[x];
+ Assert.assertEquals(text, kmer.toString());
+ }
+ }
+ }
+
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionListWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionListWritableTest.java
new file mode 100644
index 0000000..19f2f39
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionListWritableTest.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.genomix.data.test;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class PositionListWritableTest {
+
+ @Test
+ public void TestInitial() {
+ PositionListWritable plist = new PositionListWritable();
+ Assert.assertEquals(plist.getCountOfPosition(), 0);
+
+ for (int i = 0; i < 200; i++) {
+ plist.append(i, (byte) i);
+ Assert.assertEquals(i, plist.getPosition(i).getReadID());
+ Assert.assertEquals((byte) i, plist.getPosition(i).getPosInRead());
+ Assert.assertEquals(i + 1, plist.getCountOfPosition());
+ }
+ int i = 0;
+ for (PositionWritable pos : plist) {
+ Assert.assertEquals(i, pos.getReadID());
+ Assert.assertEquals((byte) i, pos.getPosInRead());
+ i++;
+ }
+
+ byte [] another = new byte [plist.getLength()*2];
+ int start = 20;
+ System.arraycopy(plist.getByteArray(), 0, another, start, plist.getLength());
+ PositionListWritable plist2 = new PositionListWritable(plist.getCountOfPosition(),another,start);
+ for( i = 0; i < plist2.getCountOfPosition(); i++){
+ Assert.assertEquals(plist.getPosition(i), plist2.getPosition(i));
+ }
+ }
+
+}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
new file mode 100644
index 0000000..13c6c0d
--- /dev/null
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/PositionWritableTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.genomix.data.test;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class PositionWritableTest {
+
+ @Test
+ public void TestInitial() {
+ PositionWritable pos = new PositionWritable();
+ pos = new PositionWritable(3, (byte) 1);
+ Assert.assertEquals(pos.getReadID(), 3);
+ Assert.assertEquals(pos.getPosInRead(), 1);
+
+ byte[] start = new byte[256];
+ for (int i = 0; i < 128; i++) {
+ Marshal.putInt(i, start, i);
+ start[i + PositionWritable.INTBYTES] = (byte) (i / 2);
+ pos = new PositionWritable(start, i);
+ Assert.assertEquals(pos.getReadID(), i);
+ Assert.assertEquals(pos.getPosInRead(), (byte) (i / 2));
+ pos.set(-i, (byte) (i / 4));
+ Assert.assertEquals(pos.getReadID(), -i);
+ Assert.assertEquals(pos.getPosInRead(), (byte) (i / 4));
+ pos.setNewReference(start, i);
+ Assert.assertEquals(pos.getReadID(), -i);
+ Assert.assertEquals(pos.getPosInRead(), (byte) (i / 4));
+
+ }
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
new file mode 100644
index 0000000..591e3c7
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingMapper extends MapReduceBase implements
+ Mapper<KmerBytesWritable, PositionListWritable, IntWritable, LineBasedmappingWritable> {
+ IntWritable numLine = new IntWritable();
+ LineBasedmappingWritable lineBasedWriter = new LineBasedmappingWritable();
+ @Override
+ public void map(KmerBytesWritable key, PositionListWritable value, OutputCollector<IntWritable, LineBasedmappingWritable> output,
+ Reporter reporter) throws IOException {
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
new file mode 100644
index 0000000..4b971a7
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -0,0 +1,86 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingReducer extends MapReduceBase implements
+ Reducer<IntWritable, LineBasedmappingWritable, NodeWritable, NullWritable> {
+
+/* public ArrayList<LineBasedmappingWritable> lineElementsSet = new ArrayList<LineBasedmappingWritable>();
+ public Position outputVerID = new Position();
+ public VertexAdjacentWritable outputAdjacentList = new VertexAdjacentWritable();
+ public PositionList srcVtexAdjList = new PositionList();
+ public PositionList desVtexAdjList = new PositionList();
+ public VertexIDListWritable srcAdjListWritable = new VertexIDListWritable();
+ public VKmerBytesWritable desKmer = new VKmerBytesWritable(1);
+ public VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ public VKmerBytesWritable srcKmer = new VKmerBytesWritable(1);*/
+ @Override
+ public void reduce(IntWritable key, Iterator<LineBasedmappingWritable> values,
+ OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
+/* while (values.hasNext()) {
+ lineElementsSet.add(values.next());
+ }
+ int[] orderLineTable = new int[lineElementsSet.size()];
+ for (int i = 0; i < lineElementsSet.size(); i++) {
+ int posInInvertedIndex = lineElementsSet.get(i).getPosInInvertedIndex();
+ orderLineTable[lineElementsSet.get(i).getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex)] = i;
+ }
+ //the first node in this read
+ int posInInvertedIndex = lineElementsSet.get(orderLineTable[0]).getPosInInvertedIndex();
+ outputVerID.set(
+ lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
+ (byte) 0);
+ desVtexAdjList.set(lineElementsSet.get(orderLineTable[1]).getAdjVertexList().get());
+ for (int i = 0; i < desVtexAdjList.getUsedSize(); i++) {
+ if (desVtexAdjList.getPosinReadListElement(i) == (byte) 0) {
+ srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
+ }
+ }
+ srcVtexAdjList.addELementToList(key.get(), (byte) 1);
+ outputVerID.set(
+ lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
+ (byte) 0);
+ srcAdjListWritable.set(srcVtexAdjList);
+ outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[0]).getVkmer());
+ output.collect(outputVerID, outputAdjacentList);
+ //srcVtexAdjList reset!!!!
+
+ for (int i = 1; i < lineElementsSet.size(); i++) {
+ desVtexAdjList.set(lineElementsSet.get(orderLineTable[i + 1]).getAdjVertexList().get());
+ boolean flag = false;
+ for (int j = 0; j < desVtexAdjList.getUsedSize(); j++) {
+ if (desVtexAdjList.getPosinReadListElement(j) == (byte) 0) {
+ srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
+ flag = true;
+ }
+ }
+ if (flag = true) {
+ //doesm't merge
+ srcVtexAdjList.addELementToList(key.get(), (byte) (i + 1));
+ outputVerID.set(
+ lineElementsSet.get(orderLineTable[i]).getAdjVertexList().get()
+ .getReadListElement(posInInvertedIndex), lineElementsSet.get(orderLineTable[i])
+ .getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex));
+ srcAdjListWritable.set(srcVtexAdjList);
+ outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[i]).getVkmer());
+ }
+ else {
+ //merge
+ desKmer.set(kmerFactory.getFirstKmerFromChain(1, lineElementsSet.get(orderLineTable[i+1]).getVkmer()));
+ srcKmer.set(lineElementsSet.get(orderLineTable[i]).getVkmer());
+ lineElementsSet.get(orderLineTable[i+1]).getVkmer().set(kmerFactory.mergeTwoKmer(srcKmer, desKmer));
+ orderLineTable[i+1] = orderLineTable[i];
+ }
+ }*/
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
new file mode 100644
index 0000000..d3c2ff4
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class GraphInvertedIndexBuildingMapper extends MapReduceBase implements
+ Mapper<LongWritable, Text, KmerBytesWritable, PositionWritable> {
+
+ public static int KMER_SIZE;
+ public PositionWritable outputVertexID;
+ public KmerBytesWritable outputKmer;
+
+ @Override
+ public void configure(JobConf job) {
+ KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+ outputVertexID = new PositionWritable();
+ outputKmer = new KmerBytesWritable(KMER_SIZE);
+ }
+ @Override
+ public void map(LongWritable key, Text value, OutputCollector<KmerBytesWritable, PositionWritable> output,
+ Reporter reporter) throws IOException {
+ String geneLine = value.toString();
+ /** first kmer */
+ byte[] array = geneLine.getBytes();
+ outputKmer.setByRead(array, 0);
+ outputVertexID.set((int)key.get(), (byte)0);
+ output.collect(outputKmer, outputVertexID);
+ /** middle kmer */
+ for (int i = KMER_SIZE; i < array.length - 1; i++) {
+ GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[i]));
+ outputVertexID.set((int)key.get(), (byte)(i - KMER_SIZE + 1));
+ output.collect(outputKmer, outputVertexID);
+ }
+ /** last kmer */
+ GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[array.length - 1]));
+ outputVertexID.set((int)key.get(), (byte)(array.length - 1 + 1));
+ output.collect(outputKmer, outputVertexID);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
new file mode 100644
index 0000000..72827f2
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings({ "deprecation", "unused" })
+public class GraphInvertedIndexBuildingReducer extends MapReduceBase implements
+ Reducer<KmerBytesWritable, PositionWritable, KmerBytesWritable, PositionListWritable> {
+ PositionListWritable outputlist = new PositionListWritable();
+ @Override
+ public void reduce(KmerBytesWritable key, Iterator<PositionWritable> values,
+ OutputCollector<KmerBytesWritable, PositionListWritable> output, Reporter reporter) throws IOException {
+ while (values.hasNext()) {
+ outputlist.append(values.next());
+ }
+ output.collect(key, outputlist);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
new file mode 100644
index 0000000..1e44903
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class LineBasedmappingWritable extends PositionListWritable{
+ byte posInRead;
+
+ public LineBasedmappingWritable() {
+ super();
+ this.posInRead = -1;
+ }
+
+ public LineBasedmappingWritable(int count, byte [] data, int offset, byte posInRead) {
+ super(count, data, offset);
+ this.posInRead = posInRead;
+ }
+
+ public void set(byte posInRead, PositionListWritable right) {
+ super.set(right);
+ this.posInRead = posInRead;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.posInRead = in.readByte();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeByte(this.posInRead);
+ }
+
+ public int getPosInInvertedIndex() {
+ return this.posInRead;
+ }
+}
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index ca5cb61..ebc49f2 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -261,5 +261,6 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+
</dependencies>
</project>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
deleted file mode 100644
index ea70fb0..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-/**
- * sum
- */
-public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
-
- public DistributedMergeLmerAggregateFactory() {
- }
-
- public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
-
- protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
- return data;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
-
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = getField(accessor, tIndex, 2);
-
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
- int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
-
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- return new DistributeAggregatorDescriptor();
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
deleted file mode 100644
index 87c0207..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
-
- protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
- return data;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = 1;
-
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = 1;
-
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
- int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
-
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
-};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
deleted file mode 100644
index b5eb70f..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.dataflow.aggregators;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-/**
- * count
- */
-public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
-
- public MergeKmerAggregateFactory() {
- }
-
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- return new LocalAggregatorDescriptor();
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
index ebf1282..3826f9b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.data.std.accessors;
+package edu.uci.ics.genomix.hyracks.data.accessors;
import java.io.DataInput;
import java.io.DataOutput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
similarity index 92%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
index 34c29c7..a4cfd3b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.data.std.accessors;
+package edu.uci.ics.genomix.hyracks.data.accessors;
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
index 8aaf380..eb0d6bb 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.data.std.accessors;
+package edu.uci.ics.genomix.hyracks.data.accessors;
import java.nio.ByteBuffer;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
similarity index 91%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
index 83b9d12..44b0e10 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.data.std.accessors;
+package edu.uci.ics.genomix.hyracks.data.accessors;
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
new file mode 100644
index 0000000..4d00731
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDNormarlizedComputeFactory implements INormalizedKeyComputerFactory{
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer(){
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ return IntegerSerializerDeserializer.getInt(bytes, start);
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
new file mode 100644
index 0000000..d328bd1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ // TODO Auto-generated method stub
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
+ int slotLength = accessor.getFieldSlotsLength();
+
+ ByteBuffer buf = accessor.getBuffer();
+
+ int hash = IntegerSerializerDeserializer.getInt(buf.array(), startOffset + fieldOffset + slotLength);
+ if (hash < 0) {
+ hash = -(hash + 1);
+ }
+
+ return hash % nParts;
+ }
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
index 8febfa5..4ceda78 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.data.std.primitive;
+package edu.uci.ics.genomix.hyracks.data.primitive;
-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
import edu.uci.ics.hyracks.data.std.api.IComparable;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
new file mode 100644
index 0000000..fcc61d5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.type.NodeWritable;
+
+public class NodeReference extends NodeWritable{
+
+ public NodeReference(int kmerSize) {
+ super(kmerSize);
+ // TODO Auto-generated constructor stub
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
new file mode 100644
index 0000000..7b46fa5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionListReference extends PositionListWritable implements IValueReference {
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
new file mode 100644
index 0000000..4c19595
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class PositionReference extends PositionWritable implements IValueReference {
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
index ed1b926..fe44f51 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
new file mode 100644
index 0000000..fe72a81
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -0,0 +1,173 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
+
+ public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = recDesc;
+ }
+
+ private static final long serialVersionUID = 1L;
+ public static final int InputKmerField = 0;
+ public static final int InputPosListField = 1;
+
+ public static final int OutputReadIDField = 0;
+ public static final int OutputPosInReadField = 1;
+ public static final int OutputOtherReadIDListField = 2;
+ public static final int OutputKmerField = 3; // may not needed
+
+ /**
+ * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherReadID,...},*Kmer*)
+ * OtherReadID appears only when otherReadID.otherPos==0
+ */
+ public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
+
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
+
+ private PositionReference positionEntry;
+ private ArrayBackedValueStorage posListEntry;
+ private ArrayBackedValueStorage zeroPositionCollection;
+ private ArrayBackedValueStorage noneZeroPositionCollection;
+
+ public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+ this.positionEntry = new PositionReference();
+ this.posListEntry = new ArrayBackedValueStorage();
+ this.zeroPositionCollection = new ArrayBackedValueStorage();
+ this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ posListEntry.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
+ scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+ }
+ }
+
+ private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
+ ArrayBackedValueStorage noneZeroPositionCollection2) {
+ zeroPositionCollection2.reset();
+ noneZeroPositionCollection2.reset();
+ byte[] data = accessor.getBuffer().array();
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewReference(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() == 0) {
+ zeroPositionCollection2.append(positionEntry);
+ } else {
+ noneZeroPositionCollection2.append(positionEntry);
+ }
+ }
+
+ }
+
+ private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+ ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
+ byte[] data = accessor.getBuffer().array();
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewReference(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() != 0) {
+ appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
+ } else {
+ appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
+ }
+ }
+ }
+
+ private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
+ ArrayTupleBuilder builder2) {
+ try {
+ builder2.addField(pos.getByteArray(), 0, PositionReference.INTBYTES);
+ builder2.addField(pos.getByteArray(), PositionReference.INTBYTES, 1);
+ //? ask Yingyi, if support empty bytes[]
+ if (posList2 == null) {
+ builder2.addFieldEndOffset();
+ } else {
+ builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+ }
+ // set kmer, may not useful
+ byte[] data = accessor.getBuffer().array();
+ int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputKmerField);
+ builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+
+ if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ builder2.reset();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(
+ "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
+ 0), recordDescriptors[0]);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
new file mode 100644
index 0000000..46a92ec
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -0,0 +1,206 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.NodeReference;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapReadToNodeOperator extends AbstractSingleActivityOperatorDescriptor {
+
+ public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = outRecDesc;
+ this.kmerSize = kmerSize;
+ }
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private final int kmerSize;
+
+ public static final int InputReadIDField = 0;
+ public static final int InputInfoFieldStart = 1;
+
+ public static final int OutputNodeIDField = 0;
+ public static final int OutputCountOfKmerField = 1;
+ public static final int OutputIncomingField = 2;
+ public static final int OutputOutgoingField = 3;
+ public static final int OutputKmerBytesField = 4;
+
+ /**
+ * (ReadID, Storage[posInRead]={len, PositionList, len, Kmer})
+ * to (Position, LengthCount, InComingPosList, OutgoingPosList, Kmer)
+ */
+ public class MapReadToNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ public static final int INT_LENGTH = 4;
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
+
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
+
+ private NodeReference curNodeEntry;
+ private NodeReference nextNodeEntry;
+
+ public MapReadToNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc, int kmerSize) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+ curNodeEntry = new NodeReference(kmerSize);
+ nextNodeEntry = new NodeReference(kmerSize);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ curNodeEntry.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ generateNodeFromRead(i);
+ }
+ }
+
+ private void generateNodeFromRead(int tIndex) throws HyracksDataException {
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ int readID = accessor.getBuffer().getInt(
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, InputReadIDField));
+ resetNode(curNodeEntry, readID, (byte) 0,
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, InputInfoFieldStart), false);
+
+ for (int i = InputInfoFieldStart + 1; i < accessor.getFieldCount(); i++) {
+ resetNode(nextNodeEntry, readID, (byte) (i - InputInfoFieldStart),
+ offsetPoslist + accessor.getFieldStartOffset(tIndex, i), true);
+ if (nextNodeEntry.getOutgoingList().getCountOfPosition() == 0) {
+ curNodeEntry.mergeNextWithinOneRead(nextNodeEntry);
+ } else {
+ curNodeEntry.setOutgoingList(nextNodeEntry.getOutgoingList());
+ curNodeEntry.getOutgoingList().append(nextNodeEntry.getNodeID());
+ outputNode(curNodeEntry);
+ nextNodeEntry.getIncomingList().append(curNodeEntry.getNodeID());
+ curNodeEntry.set(nextNodeEntry);
+ }
+ }
+ outputNode(curNodeEntry);
+ }
+
+ private void outputNode(NodeReference node) throws HyracksDataException {
+ try {
+ builder.addField(node.getNodeID().getByteArray(), node.getNodeID().getStartOffset(), node.getNodeID()
+ .getLength());
+ builder.getDataOutput().writeInt(node.getCount());
+ builder.addFieldEndOffset();
+ builder.addField(node.getIncomingList().getByteArray(), node.getIncomingList().getStartOffset(), node
+ .getIncomingList().getLength());
+ builder.addField(node.getOutgoingList().getByteArray(), node.getOutgoingList().getStartOffset(), node
+ .getOutgoingList().getLength());
+ builder.addField(node.getKmer().getBytes(), node.getKmer().getOffset(), node.getKmer().getLength());
+
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ throw new IllegalStateException("Failed to append tuplebuilder to frame");
+ }
+ }
+ builder.reset();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to Add a field to the tupleBuilder.");
+ }
+ }
+
+ private void resetNode(NodeReference node, int readID, byte posInRead, int offset, boolean byRef) {
+ node.reset();
+ node.setNodeID(readID, posInRead);
+
+ ByteBuffer buffer = accessor.getBuffer();
+ int lengthOfPosition = buffer.getInt(offset);
+ if (lengthOfPosition % PositionReference.LENGTH != 0) {
+ throw new IllegalStateException("Size of PositionList is invalid ");
+ }
+ offset += INT_LENGTH;
+ if (posInRead == 0) {
+ setPositionList(node.getIncomingList(), lengthOfPosition / PositionReference.LENGTH, buffer.array(),
+ offset, byRef);
+ } else {
+ setPositionList(node.getOutgoingList(), lengthOfPosition / PositionReference.LENGTH, buffer.array(),
+ offset, byRef);
+ }
+ offset += lengthOfPosition;
+ int lengthKmer = buffer.getInt(offset);
+ if (node.getKmer().getLength() != lengthKmer) {
+ throw new IllegalStateException("Size of Kmer is invalid ");
+ }
+ setKmer(node.getKmer(), buffer.array(), offset + INT_LENGTH, byRef);
+ node.setCount(1);
+ }
+
+ private void setKmer(KmerBytesWritable kmer, byte[] array, int offset, boolean byRef) {
+ if (byRef) {
+ kmer.setNewReference(array, offset);
+ } else {
+ kmer.set(array, offset);
+ }
+ }
+
+ private void setPositionList(PositionListWritable positionListWritable, int count, byte[] array, int offset, boolean byRef) {
+ if (byRef) {
+ positionListWritable.setNewReference(count, array, offset);
+ } else {
+ positionListWritable.set(count, array, offset);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new MapReadToNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
+ recordDescriptors[0], kmerSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
similarity index 64%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 409b434..1bc2137 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -13,17 +13,18 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,14 +32,13 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-;
-
public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
private static final long serialVersionUID = 1L;
-
+ private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
private KmerBytesWritable kmer;
private boolean bReversed;
@@ -58,76 +58,62 @@
@Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
- String geneLine = value.toString(); // Read the Real Gene Line
+ String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
+ if (geneLine.length != 2) {
+ return;
+ }
+ int readID = 0;
+ try {
+ readID = Integer.parseInt(geneLine[0]);
+ } catch (NumberFormatException e) {
+ LOG.warn("Invalid data");
+ return;
+ }
+
Pattern genePattern = Pattern.compile("[AGCT]+");
- Matcher geneMatcher = genePattern.matcher(geneLine);
+ Matcher geneMatcher = genePattern.matcher(geneLine[1]);
boolean isValid = geneMatcher.matches();
if (isValid) {
- SplitReads(geneLine.getBytes(), writer);
+ SplitReads(readID, geneLine[1].getBytes(), writer);
}
}
- private void SplitReads(byte[] array, IFrameWriter writer) {
+ private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
/** first kmer */
int k = kmer.getKmerLength();
- if (k >= array.length){
- return;
+ if (k >= array.length) {
+ return;
}
kmer.setByRead(array, 0);
- byte pre = 0;
- byte next = GeneCode.getAdjBit(array[k]);
- InsertToFrame(kmer, pre, next, writer);
+ InsertToFrame(kmer, readID, 0, writer);
/** middle kmer */
- for (int i = k; i < array.length - 1; i++) {
- pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
- next = GeneCode.getAdjBit(array[i + 1]);
- InsertToFrame(kmer, pre, next, writer);
+ for (int i = k; i < array.length; i++) {
+ kmer.shiftKmerWithNextChar(array[i]);
+ InsertToFrame(kmer, readID, i - k + 1, writer);
}
- /** last kmer */
- pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
- next = 0;
- InsertToFrame(kmer, pre, next, writer);
-
if (bReversed) {
/** first kmer */
kmer.setByReadReverse(array, 0);
- next = 0;
- pre = GeneCode.getAdjBit(array[k]);
- InsertToFrame(kmer, pre, next, writer);
+ InsertToFrame(kmer, -readID, array.length - k, writer);
/** middle kmer */
- for (int i = k; i < array.length - 1; i++) {
- next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
- pre = GeneCode.getAdjBit(array[i + 1]);
- InsertToFrame(kmer, pre, next, writer);
+ for (int i = k; i < array.length; i++) {
+ kmer.shiftKmerWithPreChar(array[i]);
+ InsertToFrame(kmer, -readID, array.length - i - 1, writer);
}
- /** last kmer */
- next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
- pre = 0;
- InsertToFrame(kmer, pre, next, writer);
}
}
- /**
- * At this graph building phase, we assume the kmer length are all
- * the same Thus we didn't output those Kmer length
- *
- * @param kmer
- * :input kmer
- * @param pre
- * : pre neighbor code
- * @param next
- * : next neighbor code
- * @param writer
- * : output writer
- */
- private void InsertToFrame(KmerBytesWritable kmer, byte pre, byte next, IFrameWriter writer) {
+ private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
try {
- byte adj = GeneCode.mergePreNextAdj(pre, next);
+ if (posInRead > 127){
+ throw new IllegalArgumentException ("Position id is beyond 127 at " + readID);
+ }
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, adj);
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, readID);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, (byte)posInRead);
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..7dc4947
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+ private PositionReference position = new PositionReference();
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
+ protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+ getOffSet(accessor, tIndex, fieldId));
+ }
+
+ protected int readIntField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ return IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ getOffSet(accessor, tIndex, fieldId));
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new ArrayBackedValueStorage());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ inputVal.reset();
+ position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+ inputVal.append(position);
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+ inputVal.append(position);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ try {
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
new file mode 100644
index 0000000..c7552ca
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -0,0 +1,136 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateReadIDAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ public static final int InputReadIDField = MapKmerPositionToReadOperator.OutputReadIDField;
+ public static final int InputPosInReadField = MapKmerPositionToReadOperator.OutputPosInReadField;
+ public static final int InputPositionListField = MapKmerPositionToReadOperator.OutputOtherReadIDListField;
+ public static final int InputKmerField = MapKmerPositionToReadOperator.OutputKmerField;
+
+ public static final int OutputReadIDField = 0;
+ public static final int OutputPositionListField = 1;
+
+ public AggregateReadIDAggregateFactory() {
+ }
+
+ /**
+ * (ReadID,PosInRead,{OtherPosition,...},Kmer) to
+ * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
+ protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+ getOffSet(accessor, tIndex, fieldId));
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new ArrayBackedValueStorage());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+ storage.reset();
+ DataOutput out = storage.getDataOutput();
+ byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+ try {
+ out.writeByte(posInRead);
+ writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+ writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write into temporary storage");
+ }
+
+ }
+
+ private void writeBytesToStorage(DataOutput out, IFrameTupleAccessor accessor, int tIndex, int idField)
+ throws IOException {
+ int len = accessor.getFieldLength(tIndex, idField);
+ out.writeInt(len);
+ out.write(accessor.getBuffer().array(), getOffSet(accessor, tIndex, idField), len);
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
+ DataOutput out = storage.getDataOutput();
+ byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+
+ try {
+ out.writeByte(posInRead);
+ writeBytesToStorage(out, accessor, tIndex, InputPositionListField);
+ writeBytesToStorage(out, accessor, tIndex, InputKmerField);
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write into temporary storage");
+ }
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ try {
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..b98db56
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor (){
+
+ private PositionReference positionReEntry = new PositionReference();
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new ArrayBackedValueStorage());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;
+ inputVal.reset();
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
+ positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(positionReEntry);
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
+ positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(positionReEntry);
+ }
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ try {
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
new file mode 100644
index 0000000..2877ee6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -0,0 +1,173 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private final int ValidPosCount;
+
+ public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+ ValidPosCount = getPositionCount(readLength, kmerLength);
+ }
+
+ public static int getPositionCount(int readLength, int kmerLength){
+ return readLength - kmerLength + 1;
+ }
+ public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+ public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+
+ public static final int BYTE_SIZE = 1;
+ public static final int INTEGER_SIZE = 4;
+
+ /**
+ * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to
+ * Aggregate as
+ * (ReadID, Storage[posInRead]={PositionList,Kmer})
+ *
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public AggregateState createAggregateStates() {
+ ArrayBackedValueStorage[] storages = new ArrayBackedValueStorage[ValidPosCount];
+ for (int i = 0; i < storages.length; i++) {
+ storages[i] = new ArrayBackedValueStorage();
+ }
+ return new AggregateState(Pair.of(storages, 0));
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ @SuppressWarnings("unchecked")
+ Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+ ArrayBackedValueStorage[] storages = pair.getLeft();
+ for (ArrayBackedValueStorage each : storages) {
+ each.reset();
+ }
+ int count = 0;
+
+ int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+ ByteBuffer fieldBuffer = accessor.getBuffer();
+
+ while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+ byte posInRead = fieldBuffer.get(fieldOffset);
+ if (storages[posInRead].getLength() > 0) {
+ throw new IllegalArgumentException("Reentering into an exist storage");
+ }
+ fieldOffset += BYTE_SIZE;
+ // read poslist
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ // read Kmer
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ count++;
+ }
+ pair.setValue(count);
+ }
+
+ private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
+ throws HyracksDataException {
+ int lengthPosList = fieldBuffer.getInt(fieldOffset);
+ try {
+ storage.getDataOutput().writeInt(lengthPosList);
+ fieldOffset += INTEGER_SIZE;
+ storage.getDataOutput().write(fieldBuffer.array(), fieldOffset, lengthPosList);
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write into temporary storage");
+ }
+ return lengthPosList + INTEGER_SIZE;
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ @SuppressWarnings("unchecked")
+ Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+ ArrayBackedValueStorage[] storages = pair.getLeft();
+ int count = pair.getRight();
+
+ int fieldOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPositionListField);
+ ByteBuffer fieldBuffer = accessor.getBuffer();
+
+ while (fieldOffset < accessor.getFieldEndOffset(tIndex, InputPositionListField)) {
+ byte posInRead = fieldBuffer.get(fieldOffset);
+ if (storages[posInRead].getLength() > 0) {
+ throw new IllegalArgumentException("Reentering into an exist storage");
+ }
+ fieldOffset += BYTE_SIZE;
+ // read poslist
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ // read Kmer
+ fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
+ count++;
+ }
+ pair.setValue(count);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ @SuppressWarnings("unchecked")
+ Pair<ArrayBackedValueStorage[], Integer> pair = (Pair<ArrayBackedValueStorage[], Integer>) state.state;
+ ArrayBackedValueStorage[] storages = pair.getLeft();
+ int count = pair.getRight();
+ if (count != storages.length) {
+ throw new IllegalStateException("Final aggregate position number is invalid");
+ }
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ for (int i = 0; i < storages.length; i++) {
+ fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(), storages[i].getLength());
+ tupleBuilder.addFieldEndOffset();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
similarity index 87%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 405e109..0772eb3 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
import java.io.DataOutput;
import java.io.IOException;
@@ -24,9 +24,8 @@
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -43,7 +42,7 @@
public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
this.confFactory = new ConfFactory(conf);
- this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
}
public class TupleWriter implements ITupleWriter {
@@ -54,7 +53,6 @@
ConfFactory cf;
Writer writer = null;
- KmerCountValue reEnterCount = new KmerCountValue();
KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
/**
@@ -66,12 +64,15 @@
byte[] kmer = tuple.getFieldData(0);
int keyStart = tuple.getFieldStart(0);
int keyLength = tuple.getFieldLength(0);
+ if (reEnterKey.getLength() > keyLength){
+ throw new IllegalArgumentException("Not enough kmer bytes");
+ }
byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
- reEnterCount.set(bitmap, count);
- reEnterKey.set(kmer, keyStart, keyLength);
- writer.append(reEnterKey, reEnterCount);
+// reEnterCount.set(bitmap, count);
+ reEnterKey.set(kmer, keyStart);
+ writer.append(reEnterKey, null);
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -81,7 +82,7 @@
public void open(DataOutput output) throws HyracksDataException {
try {
writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE, null);
+ null, CompressionType.NONE, null);
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
similarity index 91%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index cfa7262..eac0045 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -13,12 +13,11 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.dataflow;
+package edu.uci.ics.genomix.hyracks.dataflow.io;
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -42,10 +41,10 @@
@Override
public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
try {
- kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0), tuple.getFieldLength(0));
+ kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0));
output.write(kmer.toString().getBytes());
output.writeByte('\t');
- output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
+// output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
output.writeByte('\t');
output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
output.writeByte('\n');
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
new file mode 100644
index 0000000..272a21c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -0,0 +1,103 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public class NodeSequenceWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public static final int InputNodeIDField = MapReadToNodeOperator.OutputNodeIDField;
+ public static final int InputCountOfKmerField = MapReadToNodeOperator.OutputCountOfKmerField;
+ public static final int InputIncomingField = MapReadToNodeOperator.OutputIncomingField;
+ public static final int InputOutgoingField = MapReadToNodeOperator.OutputOutgoingField;
+ public static final int InputKmerBytesField = MapReadToNodeOperator.OutputKmerBytesField;
+
+ private ConfFactory confFactory;
+ private final int kmerlength;
+
+ public NodeSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+ this.confFactory = new ConfFactory(conf);
+ this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+ }
+
+ public class TupleWriter implements ITupleWriter {
+
+ public TupleWriter(ConfFactory confFactory) {
+ this.cf = confFactory;
+ }
+
+ ConfFactory cf;
+ Writer writer = null;
+ NodeWritable node = new NodeWritable(kmerlength);
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ try {
+ writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, NodeWritable.class, null,
+ CompressionType.NONE, null);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ node.getNodeID().setNewReference(tuple.getFieldData(InputNodeIDField),
+ tuple.getFieldStart(InputNodeIDField));
+ node.setCount(Marshal.getInt(tuple.getFieldData(InputCountOfKmerField),
+ tuple.getFieldStart(InputCountOfKmerField)));
+ node.getIncomingList().setNewReference(tuple.getFieldLength(InputIncomingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputIncomingField), tuple.getFieldStart(InputIncomingField));
+ node.getOutgoingList().setNewReference(tuple.getFieldLength(InputOutgoingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(InputOutgoingField), tuple.getFieldStart(InputOutgoingField));
+
+ node.getKmer().setNewReference(node.getCount() + kmerlength - 1, tuple.getFieldData(InputKmerBytesField),
+ tuple.getFieldStart(InputKmerBytesField));
+
+ try {
+ writer.append(node, null);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ /**
+ * Input schema:
+ * (Position, LengthCount, InComingPosList, OutgoingPosList, Kmer)
+ */
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new TupleWriter(confFactory);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
new file mode 100644
index 0000000..5b549b5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeTextWriterFactory.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class NodeTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private final int initialKmerSize;
+
+ public NodeTextWriterFactory(int initialKmerSize) {
+ this.initialKmerSize = initialKmerSize;
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new ITupleWriter() {
+ NodeWritable node = new NodeWritable(initialKmerSize);
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ node.getNodeID().setNewReference(tuple.getFieldData(NodeSequenceWriterFactory.InputNodeIDField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputNodeIDField));
+ node.setCount(Marshal.getInt(tuple.getFieldData(NodeSequenceWriterFactory.InputCountOfKmerField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputCountOfKmerField)));
+ node.getIncomingList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputIncomingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputIncomingField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputIncomingField));
+ node.getOutgoingList().setNewReference(
+ tuple.getFieldLength(NodeSequenceWriterFactory.InputOutgoingField) / PositionWritable.LENGTH,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputOutgoingField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputOutgoingField));
+
+ node.getKmer().setNewReference(node.getCount() + initialKmerSize - 1,
+ tuple.getFieldData(NodeSequenceWriterFactory.InputKmerBytesField),
+ tuple.getFieldStart(NodeSequenceWriterFactory.InputKmerBytesField));
+ try {
+ output.write(node.toString().getBytes());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index 27d5e15..d958205 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.driver;
+package edu.uci.ics.genomix.hyracks.driver;
import java.net.URL;
import java.util.EnumSet;
@@ -24,9 +24,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.GenericOptionsParser;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.job.JobGen;
-import edu.uci.ics.genomix.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
similarity index 92%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
index be82477..4b0c984 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.job;
+package edu.uci.ics.genomix.hyracks.job;
import java.io.IOException;
@@ -25,7 +25,9 @@
public static final String JOB_NAME = "genomix";
/** Kmers length */
- public static final String KMER_LENGTH = "genomix.kmer";
+ public static final String KMER_LENGTH = "genomix.kmerlen";
+ /** Read length */
+ public static final String READ_LENGTH = "genomix.readlen";
/** Frame Size */
public static final String FRAME_SIZE = "genomix.framesize";
/** Frame Limit, hyracks need */
@@ -46,7 +48,8 @@
public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
- public static final int DEFAULT_KMER = 21;
+ public static final int DEFAULT_KMERLEN = 21;
+ public static final int DEFAULT_READLEN = 124;
public static final int DEFAULT_FRAME_SIZE = 32768;
public static final int DEFAULT_FRAME_LIMIT = 4096;
public static final int DEFAULT_TABLE_SIZE = 10485767;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
similarity index 96%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
index b1f9a29..3563f93 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.job;
+package edu.uci.ics.genomix.hyracks.job;
import java.util.UUID;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..117b7e0
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDNormarlizedComputeFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenBrujinGraph extends JobGen {
+ public enum GroupbyType {
+ EXTERNAL,
+ PRECLUSTER,
+ HYBRIDHASH,
+ }
+
+ public enum OutputFormat {
+ TEXT,
+ BINARY,
+ }
+
+ JobConf job;
+ private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ private Scheduler scheduler;
+ private String[] ncNodeNames;
+
+ private int readLength;
+ private int kmerSize;
+ private int frameLimits;
+ private int frameSize;
+ private int tableSize;
+ private GroupbyType groupbyType;
+ private OutputFormat outputFormat;
+ private boolean bGenerateReversedKmer;
+
+ private void logDebug(String status) {
+ LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+ }
+
+ public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) {
+ super(job);
+ this.scheduler = scheduler;
+ String[] nodes = new String[ncMap.size()];
+ ncMap.keySet().toArray(nodes);
+ ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+ for (int i = 0; i < numPartitionPerMachine; i++) {
+ System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+ }
+ logDebug("initialize");
+ }
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor outRed) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+ aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+ tableSize), true);
+ }
+
+ private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
+ IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+ throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+ Object[] obj = new Object[3];
+
+ switch (groupbyType) {
+ case EXTERNAL:
+ obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+ combineRed);
+ obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+ obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+ finalRec);
+ break;
+ case PRECLUSTER:
+ default:
+ obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+ combineRed);
+ obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+ obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+ finalRec);
+ break;
+ }
+ return obj;
+ }
+
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec, RecordDescriptor outRec)
+ throws HyracksDataException {
+ try {
+
+ InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
+
+ LOG.info("HDFS read into " + splits.length + " splits");
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ return new HDFSReadOperatorDescriptor(jobSpec, outRec, job, splits, readSchedule,
+ new ReadsKeyValueParserFactory(kmerSize, bGenerateReversedKmer));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+ IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+ jobSpec.connect(conn, preOp, 0, nextOp, 0);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null });
+ RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ jobSpec.setFrameSize(frameSize);
+
+ // File input
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec, readKmerOutputRec);
+
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
+ new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+ new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+ AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ logDebug("LocalKmerGroupby Operator");
+ connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("CrossKmerGroupby Operator");
+ IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+
+ logDebug("Map Kmer to Read Operator");
+ //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer)
+ RecordDescriptor readIDOutputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { null, null, null, null });
+ AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("Group by Read Operator");
+ // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ RecordDescriptor readIDFinalRec = new RecordDescriptor(
+ new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+ objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+ new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
+ new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
+ AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("Group by ReadID merger");
+ IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+
+ logDebug("Map ReadInfo to Node");
+ //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
+ RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
+ AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec, kmerSize);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ // Output Kmer
+ ITupleWriterFactory kmerWriter = null;
+ ITupleWriterFactory nodeWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ kmerWriter = new KMerTextWriterFactory(kmerSize);
+ nodeWriter = new NodeTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ kmerWriter = new KMerSequenceWriterFactory(job);
+ nodeWriter = new NodeSequenceWriterFactory(job);
+ break;
+ }
+ logDebug("WriteOperator");
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, kmerWriter);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeKmerOperator);
+
+ // Output Node
+ HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
+ connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ jobSpec.addRoot(writeNodeOperator);
+
+ if (groupbyType == GroupbyType.PRECLUSTER) {
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ return jobSpec;
+ }
+
+ @Override
+ protected void initJobConfiguration() {
+ readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
+ kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
+ if (kmerSize % 2 == 0) {
+ kmerSize--;
+ conf.setInt(GenomixJob.KMER_LENGTH, kmerSize);
+ }
+ frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+
+ bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
+
+ String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+ if (type.equalsIgnoreCase("external")) {
+ groupbyType = GroupbyType.EXTERNAL;
+ } else if (type.equalsIgnoreCase("precluster")) {
+ groupbyType = GroupbyType.PRECLUSTER;
+ } else {
+ groupbyType = GroupbyType.HYBRIDHASH;
+ }
+
+ String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+ if (output.equalsIgnoreCase("text")) {
+ outputFormat = OutputFormat.TEXT;
+ } else {
+ outputFormat = OutputFormat.BINARY;
+ }
+ job = new JobConf(conf);
+ LOG.info("Genomix Graph Build Configuration");
+ LOG.info("Kmer:" + kmerSize);
+ LOG.info("Groupby type:" + type);
+ LOG.info("Output format:" + output);
+ LOG.info("Frame limit" + frameLimits);
+ LOG.info("Frame size" + frameSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
similarity index 95%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
index a88fa79..adce3f6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
@@ -13,17 +13,16 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.job;
+package edu.uci.ics.genomix.hyracks.job;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.util.ByteComparatorFactory;
-import edu.uci.ics.genomix.util.StatCountAggregateFactory;
-import edu.uci.ics.genomix.util.StatReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.util.StatSumAggregateFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.util.ByteComparatorFactory;
+import edu.uci.ics.genomix.hyracks.util.StatCountAggregateFactory;
+import edu.uci.ics.genomix.hyracks.util.StatSumAggregateFactory;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -65,7 +64,7 @@
@Override
protected void initJobConfiguration() {
// TODO Auto-generated method stub
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
hadoopjob = new JobConf(conf);
hadoopjob.setInputFormat(SequenceFileInputFormat.class);
}
@@ -97,7 +96,7 @@
AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
jobSpec.addRoot(writeDegree);
jobSpec.addRoot(writeCount);
- return null;
+ return jobSpec;
}
private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
@@ -107,7 +106,7 @@
String[] readSchedule = scheduler.getLocationConstraints(splits);
return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
- new StatReadsKeyValueParserFactory());
+ null); //new StatReadsKeyValueParserFactory());
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
similarity index 97%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
index 832966b..b070b56 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.util;
+package edu.uci.ics.genomix.hyracks.util;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
similarity index 98%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
index 65303a8..f483a9c 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.util;
+package edu.uci.ics.genomix.hyracks.util;
import java.io.DataOutput;
import java.io.IOException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
similarity index 98%
rename from genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
rename to genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
index 39ac60a..fb37056 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.util;
+package edu.uci.ics.genomix.hyracks.util;
import java.io.DataOutput;
import java.io.IOException;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
deleted file mode 100644
index 79b38e8..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.job;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.genomix.data.std.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
-import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
-import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
-import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenBrujinGraph extends JobGen {
- public enum GroupbyType {
- EXTERNAL,
- PRECLUSTER,
- HYBRIDHASH,
- }
-
- public enum OutputFormat {
- TEXT,
- BINARY,
- }
-
- JobConf job;
- private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- private Scheduler scheduler;
- private String[] ncNodeNames;
-
- private int kmers;
- private int frameLimits;
- private int frameSize;
- private int tableSize;
- private GroupbyType groupbyType;
- private OutputFormat outputFormat;
- private boolean bGenerateReversedKmer;
-
- private AbstractOperatorDescriptor singleGrouper;
- private IConnectorDescriptor connPartition;
- private AbstractOperatorDescriptor crossGrouper;
- private RecordDescriptor readOutputRec;
- private RecordDescriptor combineOutputRec;
-
- /** works for hybrid hashing */
- private long inputSizeInRawRecords;
- private long inputSizeInUniqueKeys;
- private int recordSizeInBytes;
- private int hashfuncStartLevel;
-
- private void logDebug(String status) {
- String names = "";
- for (String str : ncNodeNames) {
- names += str + " ";
- }
- LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
- }
-
- public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) {
- super(job);
- this.scheduler = scheduler;
- String[] nodes = new String[ncMap.size()];
- ncMap.keySet().toArray(nodes);
- ncNodeNames = new String[nodes.length * numPartitionPerMachine];
- for (int i = 0; i < numPartitionPerMachine; i++) {
- System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
- }
- logDebug("initialize");
- }
-
- private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
- combineOutputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(KmerPointable.FACTORY) }), tableSize), true);
- }
-
- private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
- long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
- IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
- return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
- inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
- new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
- combineOutputRec, true);
- }
-
- private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
- int[] keyFields = new int[] { 0 }; // the id of grouped key
-
- switch (groupbyType) {
- case EXTERNAL:
- singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
- crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
- break;
- case PRECLUSTER:
- singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
- new KmerHashPartitioncomputerFactory(), keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
- crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(), combineOutputRec);
- break;
- case HYBRIDHASH:
- default:
- singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
-
- crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
- break;
- }
- }
-
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
- try {
-
- InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
-
- LOG.info("HDFS read into " + splits.length + " splits");
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- String log = "";
- for (String schedule : readSchedule) {
- log += schedule + " ";
- }
- LOG.info("HDFS read schedule " + log);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
- new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
-
- JobSpecification jobSpec = new JobSpecification();
- readOutputRec = new RecordDescriptor(
- new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
- jobSpec.setFrameSize(frameSize);
-
- // File input
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-
- logDebug("Read Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
- generateDescriptorbyType(jobSpec);
- logDebug("SingleGroupby Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
-
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
-
- logDebug("CrossGrouper Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
- jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
-
- // Output
- ITupleWriterFactory writer = null;
- switch (outputFormat) {
- case TEXT:
- writer = new KMerTextWriterFactory(kmers);
- break;
- case BINARY:
- default:
- writer = new KMerSequenceWriterFactory(job);
- break;
- }
- HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
-
- logDebug("WriteOperator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
-
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
- jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
- jobSpec.addRoot(writeOperator);
-
- if (groupbyType == GroupbyType.PRECLUSTER) {
- jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- return jobSpec;
- }
-
- @Override
- protected void initJobConfiguration() {
-
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- if (kmers % 2 == 0) {
- kmers--;
- conf.setInt(GenomixJob.KMER_LENGTH, kmers);
- }
- frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
- tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
- frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
- inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
- inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
- recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
- hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
- /** here read the different recordSize why ? */
- recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
-
- bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
-
- String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
- if (type.equalsIgnoreCase("external")) {
- groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase("precluster")) {
- groupbyType = GroupbyType.PRECLUSTER;
- } else {
- groupbyType = GroupbyType.HYBRIDHASH;
- }
-
- String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
- if (output.equalsIgnoreCase("text")) {
- outputFormat = OutputFormat.TEXT;
- } else {
- outputFormat = OutputFormat.BINARY;
- }
- job = new JobConf(conf);
- LOG.info("Genomix Graph Build Configuration");
- LOG.info("Kmer:" + kmers);
- LOG.info("Groupby type:" + type);
- LOG.info("Output format:" + output);
- LOG.info("Frame limit" + frameLimits);
- LOG.info("Frame size" + frameSize);
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
deleted file mode 100644
index 2fcca67..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.util;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
-import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
- throws HyracksDataException {
-
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
-
- @Override
- public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
- throws HyracksDataException {
- byte adjMap = value.getAdjBitMap();
- byte count = value.getCount();
- InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
- }
-
- @Override
- public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
-
- private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
- try {
- tupleBuilder.reset();
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
-
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- };
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
similarity index 60%
copy from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
copy to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 847272a..d41bcbe 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -1,19 +1,4 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.example.jobrun;
+package edu.uci.ics.genomix.hyracks.test;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
@@ -22,8 +7,6 @@
import java.io.FileWriter;
import java.io.IOException;
-import junit.framework.Assert;
-
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -34,18 +17,14 @@
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After;
import org.junit.Before;
-import org.junit.Test;
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
-public class JobRunTest {
+public class JobRunStepByStepTest {
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
@@ -57,7 +36,6 @@
private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
-
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
@@ -82,7 +60,6 @@
driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
}
-
private void cleanupStores() throws IOException {
FileUtils.forceMkdir(new File("teststore"));
FileUtils.forceMkdir(new File("build"));
@@ -122,69 +99,7 @@
dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
}
}
-
- @Test
- public void TestAll() throws Exception {
- cleanUpReEntry();
- TestExternalGroupby();
- cleanUpReEntry();
- TestPreClusterGroupby();
- cleanUpReEntry();
- TestHybridGroupby();
- cleanUpReEntry();
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
- TestExternalReversedGroupby();
- cleanUpReEntry();
- TestPreClusterReversedGroupby();
- cleanUpReEntry();
- TestHybridReversedGroupby();
- }
-
- public void TestExternalGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "external");
- System.err.println("Testing ExternalGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- public void TestPreClusterGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- System.err.println("Testing PreClusterGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- public void TestHybridGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- System.err.println("Testing HybridGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
-
- public void TestExternalReversedGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "external");
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
- System.err.println("Testing ExternalGroupBy + Reversed");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
- }
-
- public void TestPreClusterReversedGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
- System.err.println("Testing PreclusterGroupBy + Reversed");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
- }
-
- public void TestHybridReversedGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- conf.setBoolean(GenomixJob.REVERSED_KMER, true);
- System.err.println("Testing HybridGroupBy + Reversed");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
- }
-
+
private boolean checkResults(String expectedPath) throws Exception {
File dumped = null;
String format = conf.get(GenomixJob.OUTPUT_FORMAT);
@@ -199,9 +114,9 @@
BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
String partname = "/part-" + i;
- // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- // + partname), FileSystem.getLocal(new Configuration()),
- // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+ // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+ // + partname), FileSystem.getLocal(new Configuration()),
+ // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
Path path = new Path(HDFS_OUTPUT_PATH + partname);
FileSystem dfs = FileSystem.get(conf);
@@ -212,9 +127,9 @@
// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
+ GenomixJob.DEFAULT_KMERLEN));
+// KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ KmerBytesWritable value = null;
while (reader.next(key, value)) {
if (key == null || value == null) {
break;
@@ -242,5 +157,4 @@
private void cleanupHDFS() throws Exception {
dfsCluster.shutdown();
}
-
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
similarity index 95%
rename from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
rename to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
index 847272a..4a7c8aa 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.example.jobrun;
+package edu.uci.ics.genomix.hyracks.test;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
@@ -34,16 +34,14 @@
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerCountValue;
public class JobRunTest {
private static final String ACTUAL_RESULT_DIR = "actual";
@@ -212,9 +210,9 @@
// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
- GenomixJob.DEFAULT_KMER));
- KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
+ GenomixJob.DEFAULT_KMERLEN));
+// KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ KmerBytesWritable value = null;
while (reader.next(key, value)) {
if (key == null || value == null) {
break;
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
similarity index 98%
rename from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
rename to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index aa1f791..bcfdedd 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.genomix.example.jobrun;
+package edu.uci.ics.genomix.hyracks.test;
import java.io.BufferedReader;
import java.io.File;