Change return value of shiftKmer from bitmap to genecode
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
index 1ed6f80..5be5f83 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/GeneCode.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class GeneCode {
@@ -86,6 +101,10 @@
return -1;
}
+ public static byte getBitMapFromGeneCode(byte t) {
+ return (byte) (1 << t);
+ }
+
public static int countNumberOfBitSet(int i) {
int c = 0;
for (; i != 0; c++) {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 8b9f9bd..c211ce7 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -12,6 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package edu.uci.ics.genomix.type;
import java.io.DataInput;
@@ -46,13 +47,13 @@
public KmerBytesWritable(int k, byte[] storage) {
this.kmerlength = k;
- if (k > 0){
+ if (k > 0) {
this.size = KmerUtil.getByteNumFromK(kmerlength);
this.bytes = storage;
- if (this.bytes.length < size){
+ if (this.bytes.length < size) {
throw new ArrayIndexOutOfBoundsException("Storage is smaller than required space for kmerlength:k");
}
- }else{
+ } else {
this.bytes = storage;
this.size = 0;
}
@@ -182,7 +183,7 @@
int pos = ((kmerlength - 1) % 4) << 1;
byte code = (byte) (c << pos);
bytes[0] = (byte) (((bytes[0] >>> 2) & 0x3f) | code);
- return (byte) (1 << output);
+ return output;
}
/**
@@ -215,7 +216,7 @@
bytes[0] &= (1 << ((kmerlength % 4) << 1)) - 1;
}
bytes[size - 1] = (byte) ((bytes[size - 1] << 2) | c);
- return (byte) (1 << output);
+ return output;
}
public void set(KmerBytesWritable newData) {
@@ -248,7 +249,7 @@
@Override
public int hashCode() {
- return super.hashCode() * this.kmerlength;
+ return super.hashCode() * 31 + this.kmerlength;
}
@Override
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
index 59cc3b0..68dec30 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerUtil.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class KmerUtil {
@@ -19,7 +34,7 @@
public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
StringBuilder strKmer = new StringBuilder();
int byteId = keyStart + keyLength - 1;
- if (byteId < 0){
+ if (byteId < 0) {
return empty;
}
byte currentbyte = keyData[byteId];
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
index a66d408..fc63fd8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritable.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class VKmerBytesWritable extends KmerBytesWritable {
@@ -10,8 +25,8 @@
public VKmerBytesWritable() {
super();
}
-
- public VKmerBytesWritable(int k, byte[] storage){
+
+ public VKmerBytesWritable(int k, byte[] storage) {
super(k, storage);
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
index ab1e633..9bd6acb 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/VKmerBytesWritableFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type;
public class VKmerBytesWritableFactory {
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
index ee806cc..7aa05f5 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/Kmer.java
@@ -1,15 +1,29 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.type.old;
@Deprecated
public class Kmer {
-
+
public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
public final static class GENE_CODE {
/**
- * make sure this 4 ids equal to the sequence id of char in
- * {@GENE_SYMBOL}
+ * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
*/
public static final byte A = 0;
public static final byte C = 1;
@@ -19,22 +33,22 @@
public static byte getCodeFromSymbol(byte ch) {
byte r = 0;
switch (ch) {
- case 'A':
- case 'a':
- r = A;
- break;
- case 'C':
- case 'c':
- r = C;
- break;
- case 'G':
- case 'g':
- r = G;
- break;
- case 'T':
- case 't':
- r = T;
- break;
+ case 'A':
+ case 'a':
+ r = A;
+ break;
+ case 'C':
+ case 'c':
+ r = C;
+ break;
+ case 'G':
+ case 'g':
+ r = G;
+ break;
+ case 'T':
+ case 't':
+ r = T;
+ break;
}
return r;
}
@@ -49,42 +63,44 @@
public static byte getAdjBit(byte t) {
byte r = 0;
switch (t) {
- case 'A':
- case 'a':
- r = 1 << A;
- break;
- case 'C':
- case 'c':
- r = 1 << C;
- break;
- case 'G':
- case 'g':
- r = 1 << G;
- break;
- case 'T':
- case 't':
- r = 1 << T;
- break;
+ case 'A':
+ case 'a':
+ r = 1 << A;
+ break;
+ case 'C':
+ case 'c':
+ r = 1 << C;
+ break;
+ case 'G':
+ case 'g':
+ r = 1 << G;
+ break;
+ case 'T':
+ case 't':
+ r = 1 << T;
+ break;
}
return r;
}
- /**
- * It works for path merge.
+ /**
+ * It works for path merge.
* Merge the kmer by his next, we need to make sure the @{t} is a single neighbor.
- * @param t the neighbor code in BitMap
- * @return the genecode
+ *
+ * @param t
+ * the neighbor code in BitMap
+ * @return the genecode
*/
public static byte getGeneCodeFromBitMap(byte t) {
switch (t) {
- case 1 << A:
- return A;
- case 1 << C:
- return C;
- case 1 << G:
- return G;
- case 1 << T:
- return T;
+ case 1 << A:
+ return A;
+ case 1 << C:
+ return C;
+ case 1 << G:
+ return G;
+ case 1 << T:
+ return T;
}
return -1;
}
@@ -112,8 +128,7 @@
}
}
- public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
- int keyLength) {
+ public static String recoverKmerFrom(int k, byte[] keyData, int keyStart, int keyLength) {
StringBuilder strKmer = new StringBuilder();
int byteId = keyStart + keyLength - 1;
byte currentbyte = keyData[byteId];
@@ -277,8 +292,7 @@
if (k % 4 != 0) {
kmer[0] &= (1 << ((k % 4) << 1)) - 1;
}
- kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE
- .getCodeFromSymbol(c));
+ kmer[kmer.length - 1] = (byte) ((kmer[kmer.length - 1] << 2) | GENE_CODE.getCodeFromSymbol(c));
return (byte) (1 << output);
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
index dd76427..7a96d2f 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerBytesWritable.java
@@ -8,8 +8,7 @@
import org.apache.hadoop.io.WritableComparator;
@Deprecated
-public class KmerBytesWritable extends BinaryComparable implements
- WritableComparable<BinaryComparable> {
+public class KmerBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
private static final int LENGTH_BYTES = 4;
private static final byte[] EMPTY_BYTES = {};
private byte size;
@@ -126,8 +125,7 @@
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2,
- s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+ return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
}
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
index 7e9d2f3..a4b1bec 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/old/KmerUtil.java
@@ -22,16 +22,16 @@
}
/**
- * Get last kmer from kmer-chain.
+ * Get last kmer from kmer-chain.
* e.g. kmerChain is AAGCTA, if k =5, it will
* return AGCTA
+ *
* @param k
* @param kInChain
* @param kmerChain
* @return LastKmer bytes array
*/
- public static byte[] getLastKmerFromChain(int k, int kInChain,
- byte[] kmerChain, int offset, int length) {
+ public static byte[] getLastKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
if (k > kInChain) {
return null;
}
@@ -66,8 +66,7 @@
* @param kmerChain
* @return FirstKmer bytes array
*/
- public static byte[] getFirstKmerFromChain(int k, int kInChain,
- byte[] kmerChain, int offset, int length) {
+ public static byte[] getFirstKmerFromChain(int k, int kInChain, byte[] kmerChain, int offset, int length) {
if (k > kInChain) {
return null;
}
@@ -94,9 +93,13 @@
* Merge kmer with next neighbor in gene-code format.
* The k of new kmer will increase by 1
* e.g. AAGCT merge with A => AAGCTA
- * @param k :input k of kmer
- * @param kmer : input bytes of kmer
- * @param nextCode: next neighbor in gene-code format
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param nextCode
+ * : next neighbor in gene-code format
* @return the merged Kmer, this K of this Kmer is k+1
*/
public static byte[] mergeKmerWithNextCode(int k, byte[] kmer, int offset, int length, byte nextCode) {
@@ -120,9 +123,13 @@
* Merge kmer with previous neighbor in gene-code format.
* The k of new kmer will increase by 1
* e.g. AAGCT merge with A => AAAGCT
- * @param k :input k of kmer
- * @param kmer : input bytes of kmer
- * @param preCode: next neighbor in gene-code format
+ *
+ * @param k
+ * :input k of kmer
+ * @param kmer
+ * : input bytes of kmer
+ * @param preCode
+ * : next neighbor in gene-code format
* @return the merged Kmer,this K of this Kmer is k+1
*/
public static byte[] mergeKmerWithPreCode(int k, byte[] kmer, int offset, int length, byte preCode) {
@@ -147,10 +154,15 @@
/**
* Merge two kmer to one kmer
* e.g. ACTA + ACCGT => ACTAACCGT
- * @param preK : previous k of kmer
- * @param kmerPre : bytes array of previous kmer
- * @param nextK : next k of kmer
- * @param kmerNext : bytes array of next kmer
+ *
+ * @param preK
+ * : previous k of kmer
+ * @param kmerPre
+ * : bytes array of previous kmer
+ * @param nextK
+ * : next k of kmer
+ * @param kmerNext
+ * : bytes array of next kmer
* @return merged kmer, the new k is @preK + @nextK
*/
public static byte[] mergeTwoKmer(int preK, byte[] kmerPre, int offsetPre, int lengthPre, int nextK,
@@ -161,7 +173,7 @@
for (; i <= lengthPre; i++) {
mergedKmer[byteNum - i] = kmerPre[offsetPre + lengthPre - i];
}
- if ( i > 1){
+ if (i > 1) {
i--;
}
if (preK % 4 == 0) {
@@ -172,52 +184,58 @@
int posNeedToMove = ((preK % 4) << 1);
mergedKmer[byteNum - i] |= kmerNext[offsetNext + lengthNext - 1] << posNeedToMove;
for (int j = 1; j < lengthNext; j++) {
- mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext
- - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext + lengthNext
- - j - 1] << posNeedToMove));
+ mergedKmer[byteNum - i - j] = (byte) (((kmerNext[offsetNext + lengthNext - j] & 0xff) >> (8 - posNeedToMove)) | (kmerNext[offsetNext
+ + lengthNext - j - 1] << posNeedToMove));
}
- if ( (nextK % 4) * 2 + posNeedToMove > 8) {
+ if ((nextK % 4) * 2 + posNeedToMove > 8) {
mergedKmer[0] = (byte) (kmerNext[offsetNext] >> (8 - posNeedToMove));
}
}
return mergedKmer;
}
-
+
/**
* Safely shifted the kmer forward without change the input kmer
* e.g. AGCGC shift with T => GCGCT
- * @param k: kmer length
- * @param kmer: input kmer
- * @param afterCode: input genecode
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param afterCode
+ * : input genecode
* @return new created kmer that shifted by afterCode, the K will not change
*/
- public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode){
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+ public static byte[] shiftKmerWithNextCode(int k, final byte[] kmer, int offset, int length, byte afterCode) {
+ byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
Kmer.moveKmer(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(afterCode));
return shifted;
}
-
+
/**
* Safely shifted the kmer backward without change the input kmer
* e.g. AGCGC shift with T => TAGCG
- * @param k: kmer length
- * @param kmer: input kmer
- * @param preCode: input genecode
+ *
+ * @param k
+ * : kmer length
+ * @param kmer
+ * : input kmer
+ * @param preCode
+ * : input genecode
* @return new created kmer that shifted by preCode, the K will not change
*/
- public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode){
- byte[] shifted = Arrays.copyOfRange(kmer, offset, offset+length);
+ public static byte[] shiftKmerWithPreCode(int k, final byte[] kmer, int offset, int length, byte preCode) {
+ byte[] shifted = Arrays.copyOfRange(kmer, offset, offset + length);
Kmer.moveKmerReverse(k, shifted, Kmer.GENE_CODE.getSymbolFromCode(preCode));
return shifted;
}
- public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer,
- int offset, int length) {
+ public static byte getGeneCodeAtPosition(int pos, int k, final byte[] kmer, int offset, int length) {
if (pos >= k) {
return -1;
}
int posByte = pos / 4;
- int shift = (pos % 4) << 1;
+ int shift = (pos % 4) << 1;
return (byte) ((kmer[offset + length - 1 - posByte] >> shift) & 0x3);
}
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
index f21da91..faee509 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/KmerBytesWritableTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.example.kmer;
import junit.framework.Assert;
@@ -33,7 +48,7 @@
}
byte out = kmer.shiftKmerWithNextChar(array[array.length - 1]);
- Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+ Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
Assert.assertEquals(kmer.toString(), "ATAGAAG");
}
@@ -59,7 +74,7 @@
}
byte out = kmer.shiftKmerWithPreChar(array[array.length - 1]);
- Assert.assertEquals(out, GeneCode.getAdjBit((byte) 'A'));
+ Assert.assertEquals(out, GeneCode.getCodeFromSymbol((byte) 'A'));
Assert.assertEquals(kmer.toString(), "GAATAGA");
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
index 6611752..7c4c675 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/example/kmer/VKmerBytesWritableFactoryTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.example.kmer;
import org.junit.Assert;
@@ -31,49 +46,49 @@
for (int i = 8; i > 0; i--) {
lastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
- lastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+ lastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), lastKmer.toString());
}
VKmerBytesWritable vlastKmer;
for (int i = 8; i > 0; i--) {
vlastKmer = kmerFactory.getLastKmerFromChain(i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
- vlastKmer = kmerFactory.getSubKmerFromChain(9-i, i, kmer);
+ vlastKmer = kmerFactory.getSubKmerFromChain(9 - i, i, kmer);
Assert.assertEquals("AGCTGACCG".substring(9 - i), vlastKmer.toString());
}
}
-
+
@Test
- public void TestGetFirstKmer(){
+ public void TestGetFirstKmer() {
KmerBytesWritable kmer = new KmerBytesWritable(9);
kmer.setByRead(array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
KmerBytesWritable firstKmer;
for (int i = 8; i > 0; i--) {
firstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());
- firstKmer = kmerFactory.getSubKmerFromChain(0,i,kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), firstKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
+ firstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
+ Assert.assertEquals("AGCTGACCG".substring(0, i), firstKmer.toString());
}
VKmerBytesWritable vfirstKmer;
for (int i = 8; i > 0; i--) {
vfirstKmer = kmerFactory.getFirstKmerFromChain(i, kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
vfirstKmer = kmerFactory.getSubKmerFromChain(0, i, kmer);
- Assert.assertEquals("AGCTGACCG".substring(0,i), vfirstKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(0, i), vfirstKmer.toString());
}
}
-
- @Test
- public void TestGetSubKmer(){
+
+ @Test
+ public void TestGetSubKmer() {
KmerBytesWritable kmer = new KmerBytesWritable(9);
kmer.setByRead(array, 0);
Assert.assertEquals("AGCTGACCG", kmer.toString());
VKmerBytesWritable subKmer;
- for (int istart = 0; istart < kmer.getKmerLength()-1; istart++) {
- for(int isize = 1; isize + istart <= kmer.getKmerLength(); isize ++){
+ for (int istart = 0; istart < kmer.getKmerLength() - 1; istart++) {
+ for (int isize = 1; isize + istart <= kmer.getKmerLength(); isize++) {
subKmer = kmerFactory.getSubKmerFromChain(istart, isize, kmer);
- Assert.assertEquals("AGCTGACCG".substring(istart, istart+isize), subKmer.toString());
+ Assert.assertEquals("AGCTGACCG".substring(istart, istart + isize), subKmer.toString());
}
}
}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
index d36be17..f3f4584 100755
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/graphbuilding/GenomixMapper.java
@@ -87,14 +87,14 @@
output.collect(outputKmer, outputAdjList);
/** middle kmer */
for (int i = KMER_SIZE; i < array.length - 1; i++) {
- pre = outputKmer.shiftKmerWithNextChar(array[i]);
+ pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[i]));
next = GeneCode.getAdjBit(array[i + 1]);
adj = GeneCode.mergePreNextAdj(pre, next);
outputAdjList.set(adj, count);
output.collect(outputKmer, outputAdjList);
}
/** last kmer */
- pre = outputKmer.shiftKmerWithNextChar(array[array.length - 1]);
+ pre = GeneCode.getBitMapFromGeneCode(outputKmer.shiftKmerWithNextChar(array[array.length - 1]));
next = 0;
adj = GeneCode.mergePreNextAdj(pre, next);
outputAdjList.set(adj, count);
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
index 7b0005b..44af11b 100755
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/graphbuilding/GraphBuildingTest.java
@@ -71,7 +71,8 @@
SequenceFile.Reader reader = null;
Path path = new Path(RESULT_PATH + "/part-00000");
reader = new SequenceFile.Reader(dfs, path, conf);
- KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ KmerBytesWritable key = new KmerBytesWritable(SIZE_KMER);
KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
File filePathTo = new File(TEST_SOURCE_DIR);
BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
index 41e3e4b..ebf1282 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/ByteSerializerDeserializer.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import java.io.DataInput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
index c1e9804..34c29c7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
index 86ca296..8aaf380 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerHashPartitioncomputerFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import java.nio.ByteBuffer;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
index d1d0054..83b9d12 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerNormarlizedComputerFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.accessors;
import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
index b4aa4c7..8febfa5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.data.std.primitive;
import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d3de2ba..ed1b926 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -9,22 +24,19 @@
/**
* used by precluster groupby
- *
*/
-public class ConnectorPolicyAssignmentPolicy implements
- IConnectorPolicyAssignmentPolicy {
- private static final long serialVersionUID = 1L;
- private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
- private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
- @Override
- public IConnectorPolicy getConnectorPolicyAssignment(
- IConnectorDescriptor c, int nProducers, int nConsumers,
- int[] fanouts) {
- if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
- return senderSideMaterializePolicy;
- } else {
- return pipeliningPolicy;
- }
- }
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 413c73b..405e109 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import java.io.DataOutput;
@@ -22,68 +37,65 @@
@SuppressWarnings("deprecation")
public class KMerSequenceWriterFactory implements ITupleWriterFactory {
- private static final long serialVersionUID = 1L;
- private ConfFactory confFactory;
- private final int kmerlength;
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private final int kmerlength;
- public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
- this.confFactory = new ConfFactory(conf);
- this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- }
+ public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+ this.confFactory = new ConfFactory(conf);
+ this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ }
- public class TupleWriter implements ITupleWriter {
- public TupleWriter(ConfFactory cf) {
- this.cf = cf;
- }
+ public class TupleWriter implements ITupleWriter {
+ public TupleWriter(ConfFactory cf) {
+ this.cf = cf;
+ }
- ConfFactory cf;
- Writer writer = null;
+ ConfFactory cf;
+ Writer writer = null;
- KmerCountValue reEnterCount = new KmerCountValue();
- KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+ KmerCountValue reEnterCount = new KmerCountValue();
+ KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
- /**
- * assumption is that output never change source!
- */
- @Override
- public void write(DataOutput output, ITupleReference tuple)
- throws HyracksDataException {
- try {
- byte[] kmer = tuple.getFieldData(0);
- int keyStart = tuple.getFieldStart(0);
- int keyLength = tuple.getFieldLength(0);
+ /**
+ * assumption is that output never change source!
+ */
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ byte[] kmer = tuple.getFieldData(0);
+ int keyStart = tuple.getFieldStart(0);
+ int keyLength = tuple.getFieldLength(0);
- byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
- byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
- reEnterCount.set(bitmap, count);
- reEnterKey.set(kmer, keyStart, keyLength);
- writer.append(reEnterKey, reEnterCount);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+ byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+ reEnterCount.set(bitmap, count);
+ reEnterKey.set(kmer, keyStart, keyLength);
+ writer.append(reEnterKey, reEnterCount);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void open(DataOutput output) throws HyracksDataException {
- try {
- writer = SequenceFile.createWriter(cf.getConf(),
- (FSDataOutputStream) output, KmerBytesWritable.class,
- KmerCountValue.class, CompressionType.NONE, null);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ try {
+ writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
+ KmerCountValue.class, CompressionType.NONE, null);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void close(DataOutput output) throws HyracksDataException {
- // TODO Auto-generated method stub
- }
- }
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ }
+ }
- @Override
- public ITupleWriter getTupleWriter(IHyracksTaskContext ctx)
- throws HyracksDataException {
- return new TupleWriter(confFactory);
- }
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new TupleWriter(confFactory);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index 4ba38d0..cfa7262 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import java.io.DataOutput;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 79ad195..4f5b4da 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow;
import java.nio.ByteBuffer;
@@ -62,13 +77,13 @@
/** middle kmer */
for (int i = k; i < array.length - 1; i++) {
- pre = kmer.shiftKmerWithNextChar(array[i]);
+ pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[i]));
next = GeneCode.getAdjBit(array[i + 1]);
InsertToFrame(kmer, pre, next, writer);
}
/** last kmer */
- pre = kmer.shiftKmerWithNextChar(array[array.length - 1]);
+ pre = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithNextChar(array[array.length - 1]));
next = 0;
InsertToFrame(kmer, pre, next, writer);
@@ -80,12 +95,12 @@
InsertToFrame(kmer, pre, next, writer);
/** middle kmer */
for (int i = k; i < array.length - 1; i++) {
- next = kmer.shiftKmerWithPreChar(array[i]);
+ next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[i]));
pre = GeneCode.getAdjBit(array[i + 1]);
InsertToFrame(kmer, pre, next, writer);
}
/** last kmer */
- next = kmer.shiftKmerWithPreChar(array[array.length - 1]);
+ next = GeneCode.getBitMapFromGeneCode(kmer.shiftKmerWithPreChar(array[array.length - 1]));
pre = 0;
InsertToFrame(kmer, pre, next, writer);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 62680ed..ea70fb0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow.aggregators;
import java.io.DataOutput;
@@ -15,126 +30,110 @@
/**
* sum
- *
*/
-public class DistributedMergeLmerAggregateFactory implements
- IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
+public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
- public DistributedMergeLmerAggregateFactory() {
- }
+ public DistributedMergeLmerAggregateFactory() {
+ }
- public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
+ public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
+ private static final int MAX = 127;
- @Override
- public void reset() {
- }
+ @Override
+ public void reset() {
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new Object() {
+ });
+ }
- protected byte getField(IFrameTupleAccessor accessor, int tIndex,
- int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
- .array(), offset);
- return data;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
+ protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+ return data;
+ }
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
- 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
- + bitfieldStart;
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ short count = getField(accessor, tIndex, 2);
- byte[] data = stateAccessor.getBuffer().array();
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
+ int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
+ byte[] data = stateAccessor.getBuffer().array();
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ bitmap |= data[bitoffset];
+ count += data[countoffset];
+ if (count >= MAX) {
+ count = (byte) MAX;
+ }
+ data[bitoffset] = bitmap;
+ data[countoffset] = (byte) count;
+ }
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
- }
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- return new DistributeAggregatorDescriptor();
- }
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new DistributeAggregatorDescriptor();
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
index 330f950..87c0207 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow.aggregators;
import java.io.DataOutput;
@@ -11,106 +26,93 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
- private static final int MAX = 127;
+ private static final int MAX = 127;
- @Override
- public void reset() {
- }
+ @Override
+ public void reset() {
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public AggregateState createAggregateStates() {
- return new AggregateState(new Object() {
- });
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new Object() {
+ });
+ }
- protected byte getField(IFrameTupleAccessor accessor, int tIndex,
- int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
- .array(), offset);
- return data;
- }
+ protected byte getField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+ return data;
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = 1;
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = 1;
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = 1;
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ short count = 1;
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
- 1);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 2);
- int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
- + bitfieldStart;
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 2);
+ int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + bitfieldStart;
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- byte[] data = stateAccessor.getBuffer().array();
+ byte[] data = stateAccessor.getBuffer().array();
- bitmap |= data[bitoffset];
- count += data[countoffset];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[bitoffset] = bitmap;
- data[countoffset] = (byte) count;
- }
+ bitmap |= data[bitoffset];
+ count += data[countoffset];
+ if (count >= MAX) {
+ count = (byte) MAX;
+ }
+ data[bitoffset] = bitmap;
+ data[countoffset] = (byte) count;
+ }
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- }
+ }
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 58ff8a2..b5eb70f 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.dataflow.aggregators;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -8,20 +23,18 @@
/**
* count
- *
*/
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public MergeKmerAggregateFactory() {
- }
+ public MergeKmerAggregateFactory() {
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- return new LocalAggregatorDescriptor();
- }
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new LocalAggregatorDescriptor();
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index 99d6c89..27d5e15 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.driver;
import java.net.URL;
@@ -22,122 +37,112 @@
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class Driver {
- public static enum Plan {
- BUILD_DEBRUJIN_GRAPH, GRAPH_CLEANNING, CONTIGS_GENERATION,
- }
+ public static enum Plan {
+ BUILD_DEBRUJIN_GRAPH,
+ GRAPH_CLEANNING,
+ CONTIGS_GENERATION,
+ }
- private static final String IS_PROFILING = "genomix.driver.profiling";
- private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
- private static final String applicationName = GenomixJob.JOB_NAME;
- private static final Log LOG = LogFactory.getLog(Driver.class);
- private JobGen jobGen;
- private boolean profiling;
+ private static final String IS_PROFILING = "genomix.driver.profiling";
+ private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private JobGen jobGen;
+ private boolean profiling;
- private int numPartitionPerMachine;
+ private int numPartitionPerMachine;
- private IHyracksClientConnection hcc;
- private Scheduler scheduler;
+ private IHyracksClientConnection hcc;
+ private Scheduler scheduler;
- public Driver(String ipAddress, int port, int numPartitionPerMachine)
- throws HyracksException {
- try {
- hcc = new HyracksConnection(ipAddress, port);
- scheduler = new Scheduler(hcc.getNodeControllerInfos());
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- this.numPartitionPerMachine = numPartitionPerMachine;
- }
+ public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+ try {
+ hcc = new HyracksConnection(ipAddress, port);
+ scheduler = new Scheduler(hcc.getNodeControllerInfos());
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.numPartitionPerMachine = numPartitionPerMachine;
+ }
- public void runJob(GenomixJob job) throws HyracksException {
- runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
- }
+ public void runJob(GenomixJob job) throws HyracksException {
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+ }
- public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
- throws HyracksException {
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader()
- .getResource("core-site.xml");
- job.addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader()
- .getResource("mapred-site.xml");
- job.addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader()
- .getResource("hdfs-site.xml");
- job.addResource(hadoopHdfs);
+ public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ job.addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ job.addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ job.addResource(hadoopHdfs);
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
- this.profiling = profiling;
- try {
- Map<String, NodeControllerInfo> ncMap = hcc
- .getNodeControllerInfos();
- LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
- switch (planChoice) {
- case BUILD_DEBRUJIN_GRAPH:
- default:
- jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
- numPartitionPerMachine);
- break;
- }
+ this.profiling = profiling;
+ try {
+ Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+ LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+ switch (planChoice) {
+ case BUILD_DEBRUJIN_GRAPH:
+ default:
+ jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ }
- start = System.currentTimeMillis();
- runCreate(jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- LOG.info("job finished");
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
+ start = System.currentTimeMillis();
+ runCreate(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification createJob = jobGen.generateJob();
- execute(createJob);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification createJob = jobGen.generateJob();
+ execute(createJob);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
- private void execute(JobSpecification job) throws Exception {
- job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc.startJob(
- job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
- .noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- }
+ private void execute(JobSpecification job) throws Exception {
+ job.setUseConnectorPolicyForScheduling(false);
+ JobId jobId = hcc
+ .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
- public static void main(String[] args) throws Exception {
- GenomixJob jobConf = new GenomixJob();
- String[] otherArgs = new GenericOptionsParser(jobConf, args)
- .getRemainingArgs();
- if (otherArgs.length < 4) {
- System.err.println("Need <serverIP> <port> <input> <output>");
- System.exit(-1);
- }
- String ipAddress = otherArgs[0];
- int port = Integer.parseInt(otherArgs[1]);
- int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
- boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
- // FileInputFormat.setInputPaths(job, otherArgs[2]);
- {
- Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
- jobConf.set("mapred.input.dir", path.toString());
+ public static void main(String[] args) throws Exception {
+ GenomixJob jobConf = new GenomixJob();
+ String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+ if (otherArgs.length < 4) {
+ System.err.println("Need <serverIP> <port> <input> <output>");
+ System.exit(-1);
+ }
+ String ipAddress = otherArgs[0];
+ int port = Integer.parseInt(otherArgs[1]);
+ int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+ // FileInputFormat.setInputPaths(job, otherArgs[2]);
+ {
+ Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+ jobConf.set("mapred.input.dir", path.toString());
- Path outputDir = new Path(jobConf.getWorkingDirectory(),
- otherArgs[3]);
- jobConf.set("mapred.output.dir", outputDir.toString());
- }
- // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
- // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
- Driver driver = new Driver(ipAddress, port, numOfDuplicate);
- driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
- }
+ Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+ jobConf.set("mapred.output.dir", outputDir.toString());
+ }
+ // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+ // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+ Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+ driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 11786f3..be82477 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import java.io.IOException;
@@ -7,73 +22,73 @@
public class GenomixJob extends JobConf {
- public static final String JOB_NAME = "genomix";
+ public static final String JOB_NAME = "genomix";
- /** Kmers length */
- public static final String KMER_LENGTH = "genomix.kmer";
- /** Frame Size */
- public static final String FRAME_SIZE = "genomix.framesize";
- /** Frame Limit, hyracks need */
- public static final String FRAME_LIMIT = "genomix.framelimit";
- /** Table Size, hyracks need */
- public static final String TABLE_SIZE = "genomix.tablesize";
- /** Groupby types */
- public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
- /** Graph outputformat */
- public static final String OUTPUT_FORMAT = "genomix.graph.output";
- /** Get reversed Kmer Sequence */
- public static final String REVERSED_KMER = "genomix.kmer.reversed";
+ /** Kmers length */
+ public static final String KMER_LENGTH = "genomix.kmer";
+ /** Frame Size */
+ public static final String FRAME_SIZE = "genomix.framesize";
+ /** Frame Limit, hyracks need */
+ public static final String FRAME_LIMIT = "genomix.framelimit";
+ /** Table Size, hyracks need */
+ public static final String TABLE_SIZE = "genomix.tablesize";
+ /** Groupby types */
+ public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+ /** Graph outputformat */
+ public static final String OUTPUT_FORMAT = "genomix.graph.output";
+ /** Get reversed Kmer Sequence */
+ public static final String REVERSED_KMER = "genomix.kmer.reversed";
- /** Configurations used by hybrid groupby function in graph build phrase */
- public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
- public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
- public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
- public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
- public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+ /** Configurations used by hybrid groupby function in graph build phrase */
+ public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+ public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+ public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
- public static final int DEFAULT_KMER = 21;
- public static final int DEFAULT_FRAME_SIZE = 32768;
- public static final int DEFAULT_FRAME_LIMIT = 4096;
- public static final int DEFAULT_TABLE_SIZE = 10485767;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
- public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
- public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
- public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+ public static final int DEFAULT_KMER = 21;
+ public static final int DEFAULT_FRAME_SIZE = 32768;
+ public static final int DEFAULT_FRAME_LIMIT = 4096;
+ public static final int DEFAULT_TABLE_SIZE = 10485767;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+ public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
- public static final boolean DEFAULT_REVERSED = false;
+ public static final boolean DEFAULT_REVERSED = false;
- public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
- public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+ public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
+ public static final String DEFAULT_OUTPUT_FORMAT = "binary";
- public GenomixJob() throws IOException {
- super(new Configuration());
- }
+ public GenomixJob() throws IOException {
+ super(new Configuration());
+ }
- public GenomixJob(Configuration conf) throws IOException {
- super(conf);
- }
+ public GenomixJob(Configuration conf) throws IOException {
+ super(conf);
+ }
- /**
- * Set the kmer length
- *
- * @param the
- * desired frame size
- */
- final public void setKmerLength(int kmerlength) {
- setInt(KMER_LENGTH, kmerlength);
- }
+ /**
+ * Set the kmer length
+ *
+ * @param the
+ * desired frame size
+ */
+ final public void setKmerLength(int kmerlength) {
+ setInt(KMER_LENGTH, kmerlength);
+ }
- final public void setFrameSize(int frameSize) {
- setInt(FRAME_SIZE, frameSize);
- }
+ final public void setFrameSize(int frameSize) {
+ setInt(FRAME_SIZE, frameSize);
+ }
- final public void setFrameLimit(int frameLimit) {
- setInt(FRAME_LIMIT, frameLimit);
- }
+ final public void setFrameLimit(int frameLimit) {
+ setInt(FRAME_LIMIT, frameLimit);
+ }
- final public void setTableSize(int tableSize) {
- setInt(TABLE_SIZE, tableSize);
- }
+ final public void setTableSize(int tableSize) {
+ setInt(TABLE_SIZE, tableSize);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
index 557da6b..b1f9a29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import java.util.UUID;
@@ -9,19 +24,18 @@
public abstract class JobGen {
- protected final Configuration conf;
- protected final GenomixJob genomixJob;
- protected String jobId = new UUID(System.currentTimeMillis(),
- System.nanoTime()).toString();
+ protected final Configuration conf;
+ protected final GenomixJob genomixJob;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
- public JobGen(GenomixJob job) {
- this.conf = job;
- this.genomixJob = job;
- this.initJobConfiguration();
- }
+ public JobGen(GenomixJob job) {
+ this.conf = job;
+ this.genomixJob = job;
+ this.initJobConfiguration();
+ }
- protected abstract void initJobConfiguration();
+ protected abstract void initJobConfiguration();
- public abstract JobSpecification generateJob() throws HyracksException;
+ public abstract JobSpecification generateJob() throws HyracksException;
}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 9462fb0..79b38e8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import java.util.Map;
@@ -47,292 +62,234 @@
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class JobGenBrujinGraph extends JobGen {
- public enum GroupbyType {
- EXTERNAL, PRECLUSTER, HYBRIDHASH,
- }
+ public enum GroupbyType {
+ EXTERNAL,
+ PRECLUSTER,
+ HYBRIDHASH,
+ }
- public enum OutputFormat {
- TEXT, BINARY,
- }
+ public enum OutputFormat {
+ TEXT,
+ BINARY,
+ }
- JobConf job;
- private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
- private Scheduler scheduler;
- private String[] ncNodeNames;
+ JobConf job;
+ private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ private Scheduler scheduler;
+ private String[] ncNodeNames;
- private int kmers;
- private int frameLimits;
- private int frameSize;
- private int tableSize;
- private GroupbyType groupbyType;
- private OutputFormat outputFormat;
- private boolean bGenerateReversedKmer;
+ private int kmers;
+ private int frameLimits;
+ private int frameSize;
+ private int tableSize;
+ private GroupbyType groupbyType;
+ private OutputFormat outputFormat;
+ private boolean bGenerateReversedKmer;
- private AbstractOperatorDescriptor singleGrouper;
- private IConnectorDescriptor connPartition;
- private AbstractOperatorDescriptor crossGrouper;
- private RecordDescriptor readOutputRec;
- private RecordDescriptor combineOutputRec;
+ private AbstractOperatorDescriptor singleGrouper;
+ private IConnectorDescriptor connPartition;
+ private AbstractOperatorDescriptor crossGrouper;
+ private RecordDescriptor readOutputRec;
+ private RecordDescriptor combineOutputRec;
- /** works for hybrid hashing */
- private long inputSizeInRawRecords;
- private long inputSizeInUniqueKeys;
- private int recordSizeInBytes;
- private int hashfuncStartLevel;
+ /** works for hybrid hashing */
+ private long inputSizeInRawRecords;
+ private long inputSizeInUniqueKeys;
+ private int recordSizeInBytes;
+ private int hashfuncStartLevel;
- private void logDebug(String status) {
- String names = "";
- for (String str : ncNodeNames) {
- names += str + " ";
- }
- LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
- }
+ private void logDebug(String status) {
+ String names = "";
+ for (String str : ncNodeNames) {
+ names += str + " ";
+ }
+ LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
+ }
- public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
- final Map<String, NodeControllerInfo> ncMap,
- int numPartitionPerMachine) {
- super(job);
- this.scheduler = scheduler;
- String[] nodes = new String[ncMap.size()];
- ncMap.keySet().toArray(nodes);
- ncNodeNames = new String[nodes.length * numPartitionPerMachine];
- for (int i = 0; i < numPartitionPerMachine; i++) {
- System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
- nodes.length);
- }
- logDebug("initialize");
- }
+ public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) {
+ super(job);
+ this.scheduler = scheduler;
+ String[] nodes = new String[ncMap.size()];
+ ncMap.keySet().toArray(nodes);
+ ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+ for (int i = 0; i < numPartitionPerMachine; i++) {
+ System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+ }
+ logDebug("initialize");
+ }
- private ExternalGroupOperatorDescriptor newExternalGroupby(
- JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(
- jobSpec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) },
- new KmerNormarlizedComputerFactory(),
- aggeragater,
- new DistributedMergeLmerAggregateFactory(),
- combineOutputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(KmerPointable.FACTORY) }),
- tableSize), true);
- }
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+ combineOutputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(KmerPointable.FACTORY) }), tableSize), true);
+ }
- private HybridHashGroupOperatorDescriptor newHybridGroupby(
- JobSpecification jobSpec, int[] keyFields,
- long inputSizeInRawRecords, long inputSizeInUniqueKeys,
- int recordSizeInBytes, int hashfuncStartLevel,
- IAggregatorDescriptorFactory aggeragater)
- throws HyracksDataException {
- return new HybridHashGroupOperatorDescriptor(
- jobSpec,
- keyFields,
- frameLimits,
- inputSizeInRawRecords,
- inputSizeInUniqueKeys,
- recordSizeInBytes,
- tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() },
- hashfuncStartLevel, new KmerNormarlizedComputerFactory(),
- aggeragater, new DistributedMergeLmerAggregateFactory(),
- combineOutputRec, true);
- }
+ private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
+ long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
+ IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
+ return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
+ inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
+ new KmerNormarlizedComputerFactory(), aggeragater, new DistributedMergeLmerAggregateFactory(),
+ combineOutputRec, true);
+ }
- private void generateDescriptorbyType(JobSpecification jobSpec)
- throws HyracksDataException {
- int[] keyFields = new int[] { 0 }; // the id of grouped key
+ private void generateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
- switch (groupbyType) {
- case EXTERNAL:
- singleGrouper = newExternalGroupby(jobSpec, keyFields,
- new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
- new KmerHashPartitioncomputerFactory());
- crossGrouper = newExternalGroupby(jobSpec, keyFields,
- new DistributedMergeLmerAggregateFactory());
- break;
- case PRECLUSTER:
- singleGrouper = newExternalGroupby(jobSpec, keyFields,
- new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningMergingConnectorDescriptor(
- jobSpec,
- new KmerHashPartitioncomputerFactory(),
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) });
- crossGrouper = new PreclusteredGroupOperatorDescriptor(
- jobSpec,
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(KmerPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(),
- combineOutputRec);
- break;
- case HYBRIDHASH:
- default:
- singleGrouper = newHybridGroupby(jobSpec, keyFields,
- inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel,
- new MergeKmerAggregateFactory());
- connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
- new KmerHashPartitioncomputerFactory());
+ switch (groupbyType) {
+ case EXTERNAL:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+ crossGrouper = newExternalGroupby(jobSpec, keyFields, new DistributedMergeLmerAggregateFactory());
+ break;
+ case PRECLUSTER:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new KmerHashPartitioncomputerFactory(), keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
+ crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new DistributedMergeLmerAggregateFactory(), combineOutputRec);
+ break;
+ case HYBRIDHASH:
+ default:
+ singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
- crossGrouper = newHybridGroupby(jobSpec, keyFields,
- inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel,
- new DistributedMergeLmerAggregateFactory());
- break;
- }
- }
+ crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel, new DistributedMergeLmerAggregateFactory());
+ break;
+ }
+ }
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
- throws HyracksDataException {
- try {
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
- InputSplit[] splits = job.getInputFormat().getSplits(job,
- ncNodeNames.length);
+ InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
- LOG.info("HDFS read into " + splits.length + " splits");
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- String log = "";
- for (String schedule : readSchedule) {
- log += schedule + " ";
- }
- LOG.info("HDFS read schedule " + log);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
- splits, readSchedule, new ReadsKeyValueParserFactory(kmers,
- bGenerateReversedKmer));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ LOG.info("HDFS read into " + splits.length + " splits");
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ String log = "";
+ for (String schedule : readSchedule) {
+ log += schedule + " ";
+ }
+ LOG.info("HDFS read schedule " + log);
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
+ new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public JobSpecification generateJob() throws HyracksException {
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
- JobSpecification jobSpec = new JobSpecification();
- readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE });
- jobSpec.setFrameSize(frameSize);
+ JobSpecification jobSpec = new JobSpecification();
+ readOutputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { null, ByteSerializerDeserializer.INSTANCE });
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+ ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+ jobSpec.setFrameSize(frameSize);
- // File input
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+ // File input
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
- logDebug("Read Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- readOperator, ncNodeNames);
+ logDebug("Read Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
- generateDescriptorbyType(jobSpec);
- logDebug("SingleGroupby Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- singleGrouper, ncNodeNames);
+ generateDescriptorbyType(jobSpec);
+ logDebug("SingleGroupby Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
- logDebug("CrossGrouper Operator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- crossGrouper, ncNodeNames);
- jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+ logDebug("CrossGrouper Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
+ jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
- // Output
- ITupleWriterFactory writer = null;
- switch (outputFormat) {
- case TEXT:
- writer = new KMerTextWriterFactory(kmers);
- break;
- case BINARY:
- default:
- writer = new KMerSequenceWriterFactory(job);
- break;
- }
- HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
- jobSpec, job, writer);
+ // Output
+ ITupleWriterFactory writer = null;
+ switch (outputFormat) {
+ case TEXT:
+ writer = new KMerTextWriterFactory(kmers);
+ break;
+ case BINARY:
+ default:
+ writer = new KMerSequenceWriterFactory(job);
+ break;
+ }
+ HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
- logDebug("WriteOperator");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- writeOperator, ncNodeNames);
+ logDebug("WriteOperator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
- jobSpec.addRoot(writeOperator);
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
+ jobSpec.addRoot(writeOperator);
- if (groupbyType == GroupbyType.PRECLUSTER) {
- jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- return jobSpec;
- }
+ if (groupbyType == GroupbyType.PRECLUSTER) {
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ return jobSpec;
+ }
- @Override
- protected void initJobConfiguration() {
+ @Override
+ protected void initJobConfiguration() {
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- if (kmers % 2 == 0){
- kmers--;
- conf.setInt(GenomixJob.KMER_LENGTH, kmers);
- }
- frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT,
- GenomixJob.DEFAULT_FRAME_LIMIT);
- tableSize = conf.getInt(GenomixJob.TABLE_SIZE,
- GenomixJob.DEFAULT_TABLE_SIZE);
- frameSize = conf.getInt(GenomixJob.FRAME_SIZE,
- GenomixJob.DEFAULT_FRAME_SIZE);
- inputSizeInRawRecords = conf.getLong(
- GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
- inputSizeInUniqueKeys = conf.getLong(
- GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
- recordSizeInBytes = conf.getInt(
- GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
- hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
- /** here read the different recordSize why ? */
- recordSizeInBytes = conf.getInt(
- GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
- GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ if (kmers % 2 == 0) {
+ kmers--;
+ conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+ }
+ frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+ inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+ inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+ recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+ hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+ /** here read the different recordSize why ? */
+ recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
- bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER,
- GenomixJob.DEFAULT_REVERSED);
+ bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
- String type = conf.get(GenomixJob.GROUPBY_TYPE,
- GenomixJob.DEFAULT_GROUPBY_TYPE);
- if (type.equalsIgnoreCase("external")) {
- groupbyType = GroupbyType.EXTERNAL;
- } else if (type.equalsIgnoreCase("precluster")) {
- groupbyType = GroupbyType.PRECLUSTER;
- } else {
- groupbyType = GroupbyType.HYBRIDHASH;
- }
+ String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+ if (type.equalsIgnoreCase("external")) {
+ groupbyType = GroupbyType.EXTERNAL;
+ } else if (type.equalsIgnoreCase("precluster")) {
+ groupbyType = GroupbyType.PRECLUSTER;
+ } else {
+ groupbyType = GroupbyType.HYBRIDHASH;
+ }
- String output = conf.get(GenomixJob.OUTPUT_FORMAT,
- GenomixJob.DEFAULT_OUTPUT_FORMAT);
- if (output.equalsIgnoreCase("text")) {
- outputFormat = OutputFormat.TEXT;
- } else {
- outputFormat = OutputFormat.BINARY;
- }
- job = new JobConf(conf);
- LOG.info("Genomix Graph Build Configuration");
- LOG.info("Kmer:" + kmers);
- LOG.info("Groupby type:" + type);
- LOG.info("Output format:" + output);
- LOG.info("Frame limit" + frameLimits);
- LOG.info("Frame size" + frameSize);
- }
+ String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+ if (output.equalsIgnoreCase("text")) {
+ outputFormat = OutputFormat.TEXT;
+ } else {
+ outputFormat = OutputFormat.BINARY;
+ }
+ job = new JobConf(conf);
+ LOG.info("Genomix Graph Build Configuration");
+ LOG.info("Kmer:" + kmers);
+ LOG.info("Groupby type:" + type);
+ LOG.info("Output format:" + output);
+ LOG.info("Frame limit" + frameLimits);
+ LOG.info("Frame size" + frameSize);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
index 39cc6fc..a88fa79 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.job;
import org.apache.hadoop.mapred.InputSplit;
@@ -21,6 +36,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
@@ -34,146 +50,122 @@
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class JobGenStatistic extends JobGen {
- private int kmers;
- private JobConf hadoopjob;
- private RecordDescriptor readOutputRec;
- private String[] ncNodeNames;
- private Scheduler scheduler;
- private RecordDescriptor combineOutputRec;
-
+ private int kmers;
+ private JobConf hadoopjob;
+ private RecordDescriptor readOutputRec;
+ private String[] ncNodeNames;
+ private Scheduler scheduler;
+ private RecordDescriptor combineOutputRec;
- public JobGenStatistic(GenomixJob job) {
- super(job);
- // TODO Auto-generated constructor stub
- }
+ public JobGenStatistic(GenomixJob job) {
+ super(job);
+ // TODO Auto-generated constructor stub
+ }
- @Override
- protected void initJobConfiguration() {
- // TODO Auto-generated method stub
- kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
- hadoopjob = new JobConf(conf);
- hadoopjob.setInputFormat(SequenceFileInputFormat.class);
- }
+ @Override
+ protected void initJobConfiguration() {
+ // TODO Auto-generated method stub
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ hadoopjob = new JobConf(conf);
+ hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+ }
- @Override
- public JobSpecification generateJob() throws HyracksException {
- // TODO Auto-generated method stub
- int[] degreeFields = { 0 };
- int[] countFields = { 1 };
- JobSpecification jobSpec = new JobSpecification();
- /** specify the record fields after read */
- readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE,ByteSerializerDeserializer.INSTANCE });
- combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE });
- /** the reader */
- HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- readOperator, ncNodeNames);
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+ int[] degreeFields = { 0, 1 }; // indegree, outdegree
+ int[] countFields = { 2 };
+ JobSpecification jobSpec = new JobSpecification();
+ /** specify the record fields after read */
+ readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ /** the reader */
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
- /** the combiner aggregator */
- AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(
- jobSpec, degreeFields, readOperator);
- AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(
- jobSpec, countFields, readOperator);
+ /** the combiner aggregator */
+ AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
+ AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
- /** the final aggregator */
- AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(
- jobSpec, degreeFields, degreeLocal);
- AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(
- jobSpec, countFields, countLocal);
-
- /** writer */
- AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(
- jobSpec, degreeFields, degreeMerger);
- AbstractFileWriteOperatorDescriptor writeCount = connectWriter(
- jobSpec, countFields, countMerger);
- jobSpec.addRoot(writeDegree);
- jobSpec.addRoot(writeCount);
- return null;
- }
+ /** the final aggregator */
+ AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
+ AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
- private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
- throws HyracksDataException {
- try {
+ /** writer */
+ AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
+ AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
+ jobSpec.addRoot(writeDegree);
+ jobSpec.addRoot(writeCount);
+ return null;
+ }
- InputSplit[] splits = hadoopjob.getInputFormat().getSplits(
- hadoopjob, ncNodeNames.length);
+ private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
- String[] readSchedule = scheduler.getLocationConstraints(splits);
- return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec,
- hadoopjob, splits, readSchedule,
- new StatReadsKeyValueParserFactory());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
- private ExternalGroupOperatorDescriptor newExternalGroupby(
- JobSpecification jobSpec, int[] keyFields,
- IAggregatorDescriptorFactory aggeragater) {
- return new ExternalGroupOperatorDescriptor(jobSpec, keyFields,
- GenomixJob.DEFAULT_FRAME_LIMIT, new IBinaryComparatorFactory[] {
- new ByteComparatorFactory() }, null, aggeragater,
- new StatSumAggregateFactory(),
- combineOutputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- new ByteComparatorFactory() }),
- GenomixJob.DEFAULT_TABLE_SIZE), true);
- }
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
+ new StatReadsKeyValueParserFactory());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private AbstractOperatorDescriptor connectLocalAggregateByField(
- JobSpecification jobSpec, int[] fields,
- HDFSReadOperatorDescriptor readOperator) {
- AbstractOperatorDescriptor localAggregator = newExternalGroupby(
- jobSpec, fields, new StatCountAggregateFactory());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- localAggregator, ncNodeNames);
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
- return localAggregator;
- }
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
+ new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
+ aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ new ByteComparatorFactory(), new ByteComparatorFactory() }),
+ GenomixJob.DEFAULT_TABLE_SIZE), true);
+ }
- private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec,
- int[] fields, AbstractOperatorDescriptor localAggregator) {
- AbstractOperatorDescriptor finalAggregator = newExternalGroupby(
- jobSpec, fields, new StatSumAggregateFactory());
- // only need one reducer
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- finalAggregator, ncNodeNames[fields[0] % ncNodeNames.length]);
- IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(
- jobSpec,
- new ITuplePartitionComputerFactory(){
- private static final long serialVersionUID = 1L;
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer(){
- @Override
- public int partition(IFrameTupleAccessor accessor,
- int tIndex, int nParts)
- throws HyracksDataException {
- return 0;
- }
- };
- }
- },
- fields,
- new IBinaryComparatorFactory[]{new ByteComparatorFactory()});
- jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
- return finalAggregator;
- }
-
- private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int [] fields, AbstractOperatorDescriptor finalAggregator){
- LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(
- jobSpec, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
- writeOperator, ncNodeNames[fields[0] % ncNodeNames.length]);
+ private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
+ HDFSReadOperatorDescriptor readOperator) {
+ AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
+ new StatCountAggregateFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+ return localAggregator;
+ }
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
- jobSpec);
- jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
- return writeOperator;
- }
+ private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
+ AbstractOperatorDescriptor localAggregator) {
+ AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
+ // only need one reducer
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
+ % ncNodeNames.length]);
+ IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new ITuplePartitionComputerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException {
+ return 0;
+ }
+ };
+ }
+ }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
+ jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+ return finalAggregator;
+ }
+
+ private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
+ AbstractOperatorDescriptor finalAggregator) {
+ LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
+ % ncNodeNames.length]);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+ return writeOperator;
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
index b469be0..832966b 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -7,34 +22,33 @@
public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Override
- public IBinaryComparator createBinaryComparator() {
- return new IBinaryComparator(){
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
- int l2) {
- return b1[s1]-b2[s2];
- }
-
- };
- }
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return b1[s1] - b2[s2];
+ }
- @Override
- public IBinaryHashFunction createBinaryHashFunction() {
- return new IBinaryHashFunction(){
+ };
+ }
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- return bytes[offset];
- }
-
- };
- }
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+ return new IBinaryHashFunction() {
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ return bytes[offset];
+ }
+
+ };
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
index 49a7453..65303a8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import java.io.DataOutput;
@@ -13,109 +28,102 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class StatCountAggregateFactory implements
- IAggregatorDescriptorFactory {
+public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public class CountAggregator implements IAggregatorDescriptor {
+ public class CountAggregator implements IAggregatorDescriptor {
+ private final int[] keyFields;
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
- }
+ public CountAggregator(int[] keyFields) {
+ this.keyFields = keyFields;
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = 1;
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
- @Override
- public void reset() {
- // TODO Auto-generated method stub
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = 1;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- }
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- int count = 1;
+ }
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 1);
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ int count = 1;
- byte[] data = stateAccessor.getBuffer().array();
- count += IntegerSerializerDeserializer.getInt(data, countoffset);
- IntegerSerializerDeserializer.putInt(count, data, countoffset);
- }
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = getCount(accessor, tIndex);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- int countoffset = tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart;
- byte[] data = accessor.getBuffer().array();
+ }
- return IntegerSerializerDeserializer.getInt(data, countoffset);
- }
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ byte[] data = accessor.getBuffer().array();
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
- }
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- // TODO Auto-generated method stub
- return new CountAggregator();
- }
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new CountAggregator(keyFields);
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
index 561d64a..2fcca67 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
@@ -1,13 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import java.nio.ByteBuffer;
-import org.apache.hadoop.io.BytesWritable;
-
import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;
-import edu.uci.ics.genomix.type.KmerUtil;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -17,70 +30,65 @@
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<BytesWritable,KmerCountValue> {
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Override
- public IKeyValueParser<BytesWritable,KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
- throws HyracksDataException {
-
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(
- ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- return new IKeyValueParser<BytesWritable,KmerCountValue>(){
+ @Override
+ public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+ throws HyracksDataException {
- @Override
- public void open(IFrameWriter writer) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
- @Override
- public void parse(BytesWritable key, KmerCountValue value,
- IFrameWriter writer) throws HyracksDataException {
- byte adjMap = value.getAdjBitMap();
- byte count = value.getCount();
- InsertToFrame((byte) (GeneCode.inDegree(adjMap)*10+GeneCode.outDegree(adjMap)),count,writer);
- }
+ return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
- @Override
- public void close(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
-
- private void InsertToFrame(byte degree, byte count,
- IFrameWriter writer) {
- try {
- tupleBuilder.reset();
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,degree);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,count);
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ // TODO Auto-generated method stub
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- };
- }
+ }
+
+ @Override
+ public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
+ throws HyracksDataException {
+ byte adjMap = value.getAdjBitMap();
+ byte count = value.getCount();
+ InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+
+ private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
+ try {
+ tupleBuilder.reset();
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
index ce00667..39ac60a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.util;
import java.io.DataOutput;
@@ -15,108 +30,102 @@
public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public class DistributeAggregatorDescriptor implements
- IAggregatorDescriptor {
+ public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return null;
- }
+ private final int[] keyFields;
- protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- int countoffset = tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart;
- byte[] data = accessor.getBuffer().array();
- return IntegerSerializerDeserializer.getInt(data, countoffset);
- }
+ public DistributeAggregatorDescriptor(int[] keyFields) {
+ this.keyFields = keyFields;
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = getCount(accessor, tIndex);
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ byte[] data = accessor.getBuffer().array();
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
- @Override
- public void reset() {
- // TODO Auto-generated method stub
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
- }
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- int count = getCount(accessor, tIndex);
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int countfieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 1);
- int countoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + countfieldStart;
+ }
- byte[] data = stateAccessor.getBuffer().array();
- count += IntegerSerializerDeserializer.getInt(data, countoffset);
- IntegerSerializerDeserializer.putInt(count, data, countoffset);
- }
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- int count = getCount(accessor, tIndex);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeInt(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
- }
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
- }
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
+ }
- }
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields,
- int[] keyFieldsInPartialResults) throws HyracksDataException {
- // TODO Auto-generated method stub
- return new DistributeAggregatorDescriptor();
- }
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new DistributeAggregatorDescriptor(keyFields);
+ }
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index bef13b5..847272a 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.genomix.example.jobrun;
import java.io.BufferedWriter;
@@ -194,9 +209,10 @@
continue;
}
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
-
-// KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER));
+
+ // KmerBytesWritable key = (KmerBytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJob.KMER_LENGTH,
+ GenomixJob.DEFAULT_KMER));
KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
index 22688e0..aa1f791 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
@@ -24,39 +24,40 @@
public class TestUtils {
/**
* Compare with the sorted expected file.
- * The actual file may not be sorted;
+ * The actual file may not be sorted;
+ *
* @param expectedFile
* @param actualFile
*/
- public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+ public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception {
BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
ArrayList<String> actualLines = new ArrayList<String>();
String lineExpected, lineActual;
- try{
- while ( (lineActual = readerActual.readLine())!=null){
- actualLines.add(lineActual);
- }
- Collections.sort(actualLines);
- int num = 1;
- for(String actualLine : actualLines){
- lineExpected = readerExpected.readLine();
- if (lineExpected == null){
- throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
- }
- if ( !equalStrings(lineExpected, actualLine)){
- throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
- + actualLine);
- }
- ++num;
- }
+ try {
+ while ((lineActual = readerActual.readLine()) != null) {
+ actualLines.add(lineActual);
+ }
+ Collections.sort(actualLines);
+ int num = 1;
+ for (String actualLine : actualLines) {
lineExpected = readerExpected.readLine();
- if (lineExpected != null) {
- throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+ if (lineExpected == null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
}
- } finally{
- readerActual.close();
- readerExpected.close();
+ if (!equalStrings(lineExpected, actualLine)) {
+ throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+ + actualLine);
+ }
+ ++num;
+ }
+ lineExpected = readerExpected.readLine();
+ if (lineExpected != null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+ }
+ } finally {
+ readerActual.close();
+ readerExpected.close();
}
}